You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Sam Elamin <hu...@gmail.com> on 2017/02/01 23:25:56 UTC

Structured Streaming Schema Issue

Hi All

I am writing a bigquery connector here
<http://github.com/samelamin/spark-bigquery> and I am getting a strange
error with schemas being overwritten when a dataframe is passed over to the
Sink


for example the source returns this StructType
WARN streaming.BigQuerySource:
StructType(StructField(customerid,LongType,true),

and the sink is recieving this StructType
WARN streaming.BigQuerySink:
StructType(StructField(customerid,StringType,true)


Any idea why this might be happening?
I dont have infering schema on

spark.conf.set("spark.sql.streaming.schemaInference", "false")

I know its off by default but I set it just to be sure

So completely lost to what could be causing this

Regards
Sam

Re: Structured Streaming Schema Issue

Posted by Sam Elamin <hu...@gmail.com>.
Hey td


I figured out what was happening

My source would return the correct schema but the schema on the returned df
was actually different. I'm loading json data from cloud storage and that
gets infered instead of set

So basically the schema I return on the source provider wasn't actually
being used

Thanks again for your help. Out of interest is there a way of debugging
this on an ide? What do you recommend because I've been adding far too many
debug statements just to understand what's happening!

Regards
Sam
On Fri, 3 Feb 2017 at 13:19, Tathagata Das <ta...@gmail.com>
wrote:

> Hard to say what is going on without really understanding how you are
> creating the DF in Source.getBatch(). Yes, there may be another inference
> in getBatch, where you have to be careful that you dont infer a schema that
> is different the schema inferred earlier (when creating the streaming DF
> using readStream...load()). How about print the schema the batch DF before
> returning from getBatch. At least that would narrow down the possible
> location of the problem - in getBatch, OR after getBatch returns + before
> addBatch is called OR after addBatch is called.
>
>
>
> On Thu, Feb 2, 2017 at 7:30 AM, Sam Elamin <hu...@gmail.com>
> wrote:
>
> Hi All
>
> Ive done a bit more digging to where exactly this happens. It seems like
> the schema is infered again after the data leaves the source and then comes
> into the sink
>
> Below is a stack trace, the schema at the BigQuerySource has a LongType
> for customer id but then at the sink, the data received has an incorrect
> schema
>
> where exactly is the data stored in between these steps? Shouldnt the sink
> call the Schema method from Source?
>
> If it helps I want to clarify that I am not passing in the schema when i
> initialise the readStream, but I am overriding the sourceSchema in my
> class that extends StreamSourceProvider. But even when that method is
> called the schema is correct
>
> p.s Ignore the "production" bucket, thats just a storage bucket I am
> using, not actually using structured streaming in production just yet :)
>
>
> 17/02/02 14:57:22 WARN streaming.BigQuerySource:
> StructType(StructField(customerid,LongType,true),
> StructField(id,StringType,true), StructField(legacyorderid,LongType,true),
> StructField(notifiycustomer,BooleanType,true),
> StructField(ordercontainer,StructType(StructField(applicationinfo,StructType(StructField(applicationname,StringType,true),
> StructField(applicationversion,StringType,true),
> StructField(clientip,StringType,true),
> StructField(jefeature,StringType,true),
> StructField(useragent,StringType,true)),true),
> StructField(basketinfo,StructType(StructField(basketid,StringType,true),
> StructField(deliverycharge,FloatType,true),
> StructField(discount,FloatType,true),
> StructField(discounts,StringType,true),
> StructField(items,StructType(StructField(combinedprice,FloatType,true),
> StructField(description,StringType,true),
> StructField(discounts,StringType,true),
> StructField(mealparts,StringType,true),
> StructField(menucardnumber,StringType,true),
> StructField(multibuydiscounts,StringType,true),
> StructField(name,StringType,true),
> StructField(optionalaccessories,StringType,true),
> StructField(productid,LongType,true),
> StructField(producttypeid,LongType,true),
> StructField(requiredaccessories,StructType(StructField(groupid,LongType,true),
> StructField(name,StringType,true),
> StructField(requiredaccessoryid,LongType,true),
> StructField(unitprice,FloatType,true)),true),
> StructField(synonym,StringType,true),
> StructField(unitprice,FloatType,true)),true),
> StructField(menuid,LongType,true),
> StructField(multibuydiscount,FloatType,true),
> StructField(subtotal,FloatType,true), StructField(tospend,FloatType,true),
> StructField(total,FloatType,true)),true),
> StructField(customerinfo,StructType(StructField(address,StringType,true),
> StructField(city,StringType,true), StructField(email,StringType,true),
> StructField(id,StringType,true), StructField(name,StringType,true),
> StructField(phonenumber,StringType,true),
> StructField(postcode,StringType,true),
> StructField(previousjeordercount,LongType,true),
> StructField(previousrestuarantordercount,LongType,true),
> StructField(timezone,StringType,true)),true),
> StructField(id,StringType,true), StructField(islocked,BooleanType,true),
> StructField(legacyid,LongType,true),
> StructField(order,StructType(StructField(duedate,StringType,true),
> StructField(duedatewithutcoffset,StringType,true),
> StructField(initialduedate,StringType,true),
> StructField(initialduedatewithutcoffset,StringType,true),
> StructField(notetorestaurant,StringType,true),
> StructField(orderable,BooleanType,true),
> StructField(placeddate,StringType,true),
> StructField(promptasap,BooleanType,true),
> StructField(servicetype,StringType,true)),true),
> StructField(paymentinfo,StructType(StructField(drivertipvalue,FloatType,true),
> StructField(orderid,StringType,true),
> StructField(paiddate,StringType,true),
> StructField(paymentlines,StructType(StructField(cardfee,FloatType,true),
> StructField(cardtype,StringType,true),
> StructField(paymenttransactionref,StringType,true),
> StructField(pspname,StringType,true), StructField(type,StringType,true),
> StructField(value,FloatType,true)),true),
> StructField(total,FloatType,true),
> StructField(totalcomplementary,FloatType,true)),true),
> StructField(restaurantinfo,StructType(StructField(addresslines,StringType,true),
> StructField(city,StringType,true),
> StructField(dispatchmethod,StringType,true),
> StructField(id,StringType,true), StructField(latitude,FloatType,true),
> StructField(longitude,FloatType,true), StructField(name,StringType,true),
> StructField(offline,BooleanType,true),
> StructField(phonenumber,StringType,true),
> StructField(postcode,StringType,true),
> StructField(seoname,StringType,true),
> StructField(tempoffline,BooleanType,true)),true)),true),
> StructField(orderid,StringType,true),
> StructField(orderresolutionstatus,StringType,true),
> StructField(raisingcomponent,StringType,true),
> StructField(restaurantid,LongType,true),
> StructField(tenant,StringType,true), StructField(timestamp,StringType,true))
> 17/02/02 14:57:23 INFO storage.BlockManagerInfo: Removed
> broadcast_1_piece0 on 127.0.0.1:51379 in memory (size: 4.8 KB, free:
> 366.3 MB)
> 17/02/02 14:57:23 INFO spark.ContextCleaner: Cleaned accumulator 66
> 17/02/02 14:57:23 INFO codegen.CodeGenerator: Code generated in 194.977984
> ms
> 17/02/02 14:57:23 INFO codegen.CodeGenerator: Code generated in 10.736863
> ms
> 17/02/02 14:57:23 INFO spark.SparkContext: Starting job: start at
> EventStreamer.scala:61
> 17/02/02 14:57:23 INFO scheduler.DAGScheduler: Registering RDD 6 (start at
> EventStreamer.scala:61)
> 17/02/02 14:57:23 INFO scheduler.DAGScheduler: Got job 1 (start at
> EventStreamer.scala:61) with 1 output partitions
> 17/02/02 14:57:23 INFO scheduler.DAGScheduler: Final stage: ResultStage 2
> (start at EventStreamer.scala:61)
> 17/02/02 14:57:23 INFO scheduler.DAGScheduler: Parents of final stage:
> List(ShuffleMapStage 1)
> 17/02/02 14:57:23 INFO scheduler.DAGScheduler: Missing parents:
> List(ShuffleMapStage 1)
> 17/02/02 14:57:23 INFO scheduler.DAGScheduler: Submitting ShuffleMapStage
> 1 (MapPartitionsRDD[6] at start at EventStreamer.scala:61), which has no
> missing parents
> 17/02/02 14:57:23 INFO memory.MemoryStore: Block broadcast_2 stored as
> values in memory (estimated size 17.3 KB, free 366.0 MB)
> 17/02/02 14:57:23 INFO memory.MemoryStore: Block broadcast_2_piece0 stored
> as bytes in memory (estimated size 9.6 KB, free 366.0 MB)
> 17/02/02 14:57:23 INFO storage.BlockManagerInfo: Added broadcast_2_piece0
> in memory on 127.0.0.1:51379 (size: 9.6 KB, free: 366.3 MB)
> 17/02/02 14:57:23 INFO spark.SparkContext: Created broadcast 2 from
> broadcast at DAGScheduler.scala:1012
> 17/02/02 14:57:23 INFO scheduler.DAGScheduler: Submitting 2 missing tasks
> from ShuffleMapStage 1 (MapPartitionsRDD[6] at start at
> EventStreamer.scala:61)
> 17/02/02 14:57:23 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0
> with 2 tasks
> 17/02/02 14:57:23 INFO scheduler.TaskSetManager: Starting task 0.0 in
> stage 1.0 (TID 2, localhost, partition 0, PROCESS_LOCAL, 6444 bytes)
> 17/02/02 14:57:23 INFO scheduler.TaskSetManager: Starting task 1.0 in
> stage 1.0 (TID 3, localhost, partition 1, PROCESS_LOCAL, 6444 bytes)
> 17/02/02 14:57:23 INFO executor.Executor: Running task 0.0 in stage 1.0
> (TID 2)
> 17/02/02 14:57:23 INFO executor.Executor: Running task 1.0 in stage 1.0
> (TID 3)
> 17/02/02 14:57:23 INFO rdd.NewHadoopRDD: Input split:
> gs://je-bi-production/hadoop/tmp/bigquery/job_201702021456_0000/shard-1/data-*.json[3
> estimated records]
> 17/02/02 14:57:23 INFO rdd.NewHadoopRDD: Input split:
> gs://je-bi-production/hadoop/tmp/bigquery/job_201702021456_0000/shard-0/data-*.json[3
> estimated records]
> 17/02/02 14:57:23 INFO bigquery.DynamicFileListRecordReader: Initializing
> DynamicFileListRecordReader with split 'InputSplit:: length:3 locations: []
> toString():
> gs://je-bi-production/hadoop/tmp/bigquery/job_201702021456_0000/shard-1/data-*.json[3
> estimated records]', task context 'TaskAttemptContext::
> TaskAttemptID:attempt_201702021456_0000_m_000001_0 Status:'
> 17/02/02 14:57:23 INFO bigquery.DynamicFileListRecordReader: Initializing
> DynamicFileListRecordReader with split 'InputSplit:: length:3 locations: []
> toString():
> gs://je-bi-production/hadoop/tmp/bigquery/job_201702021456_0000/shard-0/data-*.json[3
> estimated records]', task context 'TaskAttemptContext::
> TaskAttemptID:attempt_201702021456_0000_m_000000_0 Status:'
> 17/02/02 14:57:23 INFO codegen.CodeGenerator: Code generated in 92.93187 ms
> 17/02/02 14:57:23 INFO bigquery.DynamicFileListRecordReader: Adding new
> file 'data-000000000000.json' of size 0 to knownFileSet.
> 17/02/02 14:57:23 INFO bigquery.DynamicFileListRecordReader: Moving to
> next file
> 'gs://je-bi-production/hadoop/tmp/bigquery/job_201702021456_0000/shard-1/data-000000000000.json'
> which has 0 bytes. Records read so far: 0
> 17/02/02 14:57:23 INFO bigquery.DynamicFileListRecordReader: Adding new
> file 'data-000000000000.json' of size 20424 to knownFileSet.
> 17/02/02 14:57:23 INFO bigquery.DynamicFileListRecordReader: Adding new
> file 'data-000000000001.json' of size 0 to knownFileSet.
> 17/02/02 14:57:23 INFO bigquery.DynamicFileListRecordReader: Moving to
> next file
> 'gs://je-bi-production/hadoop/tmp/bigquery/job_201702021456_0000/shard-0/data-000000000000.json'
> which has 20424 bytes. Records read so far: 0
> 17/02/02 14:57:24 INFO gcsio.GoogleCloudStorageReadChannel: Got 'range not
> satisfiable' for reading
> gs://je-bi-production/hadoop/tmp/bigquery/job_201702021456_0000/shard-1/data-000000000000.json
> at position 0; assuming empty.
> 17/02/02 14:57:24 INFO bigquery.DynamicFileListRecordReader: Found
> end-marker file 'data-000000000000.json' with index 0
> 17/02/02 14:57:24 INFO executor.Executor: Finished task 1.0 in stage 1.0
> (TID 3). 1723 bytes result sent to driver
> 17/02/02 14:57:24 INFO scheduler.TaskSetManager: Finished task 1.0 in
> stage 1.0 (TID 3) in 844 ms on localhost (1/2)
> 17/02/02 14:57:24 INFO bigquery.DynamicFileListRecordReader: Moving to
> next file
> 'gs://je-bi-production/hadoop/tmp/bigquery/job_201702021456_0000/shard-0/data-000000000001.json'
> which has 0 bytes. Records read so far: 6
> 17/02/02 14:57:24 INFO gcsio.GoogleCloudStorageReadChannel: Got 'range not
> satisfiable' for reading
> gs://je-bi-production/hadoop/tmp/bigquery/job_201702021456_0000/shard-0/data-000000000001.json
> at position 0; assuming empty.
> 17/02/02 14:57:24 INFO bigquery.DynamicFileListRecordReader: Found
> end-marker file 'data-000000000001.json' with index 1
> 17/02/02 14:57:24 INFO executor.Executor: Finished task 0.0 in stage 1.0
> (TID 2). 1717 bytes result sent to driver
> 17/02/02 14:57:24 INFO scheduler.TaskSetManager: Finished task 0.0 in
> stage 1.0 (TID 2) in 1344 ms on localhost (2/2)
> 17/02/02 14:57:24 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0,
> whose tasks have all completed, from pool
> 17/02/02 14:57:24 INFO scheduler.DAGScheduler: ShuffleMapStage 1 (start at
> EventStreamer.scala:61) finished in 1.344 s
> 17/02/02 14:57:24 INFO scheduler.DAGScheduler: looking for newly runnable
> stages
> 17/02/02 14:57:24 INFO scheduler.DAGScheduler: running: Set()
> 17/02/02 14:57:24 INFO scheduler.DAGScheduler: waiting: Set(ResultStage 2)
> 17/02/02 14:57:24 INFO scheduler.DAGScheduler: failed: Set()
> 17/02/02 14:57:24 INFO scheduler.DAGScheduler: Submitting ResultStage 2
> (MapPartitionsRDD[9] at start at EventStreamer.scala:61), which has no
> missing parents
> 17/02/02 14:57:24 INFO memory.MemoryStore: Block broadcast_3 stored as
> values in memory (estimated size 7.0 KB, free 366.0 MB)
> 17/02/02 14:57:24 INFO memory.MemoryStore: Block broadcast_3_piece0 stored
> as bytes in memory (estimated size 3.7 KB, free 366.0 MB)
> 17/02/02 14:57:24 INFO storage.BlockManagerInfo: Added broadcast_3_piece0
> in memory on 127.0.0.1:51379 (size: 3.7 KB, free: 366.3 MB)
> 17/02/02 14:57:24 INFO spark.SparkContext: Created broadcast 3 from
> broadcast at DAGScheduler.scala:1012
> 17/02/02 14:57:24 INFO scheduler.DAGScheduler: Submitting 1 missing tasks
> from ResultStage 2 (MapPartitionsRDD[9] at start at EventStreamer.scala:61)
> 17/02/02 14:57:24 INFO scheduler.TaskSchedulerImpl: Adding task set 2.0
> with 1 tasks
> 17/02/02 14:57:24 INFO scheduler.TaskSetManager: Starting task 0.0 in
> stage 2.0 (TID 4, localhost, partition 0, ANY, 6233 bytes)
> 17/02/02 14:57:24 INFO executor.Executor: Running task 0.0 in stage 2.0
> (TID 4)
> 17/02/02 14:57:24 INFO storage.ShuffleBlockFetcherIterator: Getting 2
> non-empty blocks out of 2 blocks
> 17/02/02 14:57:24 INFO storage.ShuffleBlockFetcherIterator: Started 0
> remote fetches in 5 ms
> 17/02/02 14:57:24 INFO executor.Executor: Finished task 0.0 in stage 2.0
> (TID 4). 1873 bytes result sent to driver
> 17/02/02 14:57:24 INFO scheduler.TaskSetManager: Finished task 0.0 in
> stage 2.0 (TID 4) in 37 ms on localhost (1/1)
> 17/02/02 14:57:24 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 2.0,
> whose tasks have all completed, from pool
> 17/02/02 14:57:24 INFO scheduler.DAGScheduler: ResultStage 2 (start at
> EventStreamer.scala:61) finished in 0.037 s
> 17/02/02 14:57:24 INFO scheduler.DAGScheduler: Job 1 finished: start at
> EventStreamer.scala:61, took 1.420357 s
> 17/02/02 14:57:24 INFO codegen.CodeGenerator: Code generated in 7.668964 ms
> 17/02/02 14:57:24 WARN streaming.BigQuerySink: ************ saving schema
> is set to
> 17/02/02 14:57:24 WARN streaming.BigQuerySink:
> StructType(StructField(customerid,StringType,true),
> StructField(id,StringType,true),
> StructField(legacyorderid,StringType,true),
> StructField(notifiycustomer,BooleanType,true),
> StructField(ordercontainer,StructType(StructField(applicationinfo,StructType(StructField(applicationname,StringType,true),
> StructField(applicationversion,StringType,true),
> StructField(clientip,StringType,true),
> StructField(jefeature,StringType,true),
> StructField(useragent,StringType,true)),true),
> StructField(basketinfo,StructType(StructField(basketid,StringType,true),
> StructField(deliverycharge,LongType,true),
> StructField(discount,LongType,true),
> StructField(discounts,ArrayType(StringType,true),true),
> StructField(items,ArrayType(StructType(StructField(combinedprice,DoubleType,true),
> StructField(description,StringType,true),
> StructField(discounts,ArrayType(StringType,true),true),
> StructField(mealparts,ArrayType(StringType,true),true),
> StructField(menucardnumber,StringType,true),
> StructField(multibuydiscounts,ArrayType(StringType,true),true),
> StructField(name,StringType,true),
> StructField(optionalaccessories,ArrayType(StringType,true),true),
> StructField(productid,StringType,true),
> StructField(producttypeid,StringType,true),
> StructField(requiredaccessories,ArrayType(StructType(StructField(groupid,StringType,true),
> StructField(name,StringType,true),
> StructField(requiredaccessoryid,StringType,true),
> StructField(unitprice,LongType,true)),true),true),
> StructField(synonym,StringType,true),
> StructField(unitprice,DoubleType,true)),true),true),
> StructField(menuid,StringType,true),
> StructField(multibuydiscount,LongType,true),
> StructField(subtotal,DoubleType,true), StructField(tospend,LongType,true),
> StructField(total,DoubleType,true)),true),
> StructField(customerinfo,StructType(StructField(address,StringType,true),
> StructField(city,StringType,true), StructField(email,StringType,true),
> StructField(id,StringType,true), StructField(name,StringType,true),
> StructField(phonenumber,StringType,true),
> StructField(postcode,StringType,true),
> StructField(previousjeordercount,StringType,true),
> StructField(previousrestuarantordercount,StringType,true),
> StructField(timezone,StringType,true)),true),
> StructField(id,StringType,true), StructField(islocked,BooleanType,true),
> StructField(legacyid,StringType,true),
> StructField(order,StructType(StructField(duedate,StringType,true),
> StructField(duedatewithutcoffset,StringType,true),
> StructField(initialduedate,StringType,true),
> StructField(initialduedatewithutcoffset,StringType,true),
> StructField(notetorestaurant,StringType,true),
> StructField(orderable,BooleanType,true),
> StructField(placeddate,StringType,true),
> StructField(promptasap,BooleanType,true),
> StructField(servicetype,StringType,true)),true),
> StructField(paymentinfo,StructType(StructField(drivertipvalue,LongType,true),
> StructField(orderid,StringType,true),
> StructField(paiddate,StringType,true),
> StructField(paymentlines,ArrayType(StructType(StructField(cardfee,DoubleType,true),
> StructField(cardtype,StringType,true),
> StructField(paymenttransactionref,StringType,true),
> StructField(pspname,StringType,true), StructField(type,StringType,true),
> StructField(value,DoubleType,true)),true),true),
> StructField(total,DoubleType,true),
> StructField(totalcomplementary,LongType,true)),true),
> StructField(restaurantinfo,StructType(StructField(addresslines,ArrayType(StringType,true),true),
> StructField(city,StringType,true),
> StructField(dispatchmethod,StringType,true),
> StructField(id,StringType,true), StructField(latitude,DoubleType,true),
> StructField(longitude,DoubleType,true), StructField(name,StringType,true),
> StructField(offline,BooleanType,true),
> StructField(phonenumber,StringType,true),
> StructField(postcode,StringType,true),
> StructField(seoname,StringType,true),
> StructField(tempoffline,BooleanType,true)),true)),true),
> StructField(orderid,StringType,true),
> StructField(orderresolutionstatus,StringType,true),
> StructField(raisingcomponent,StringType,true),
> StructField(restaurantid,StringType,true),
> StructField(tenant,StringType,true), StructField(timestamp,StringType,true))
>
>
> On Thu, Feb 2, 2017 at 12:04 AM, Sam Elamin <hu...@gmail.com>
> wrote:
>
> There isn't a query per se.im writing the entire dataframe from the
> output of the read stream. Once I got that working I was planning to test
> the query aspect
>
>
> I'll do a bit more digging. Thank you very much for your help. Structued
> streaming is very exciting and I really am enjoying writing a connector for
> it!
>
> Regards
> Sam
> On Thu, 2 Feb 2017 at 00:02, Tathagata Das <ta...@gmail.com>
> wrote:
>
> What is the query you are apply writeStream on? Essentially can you print
> the whole query.
>
> Also, you can do StreamingQuery.explain() to see in full details how the
> logical plan changes to physical plan, for a batch of data. that might
> help. try doing that with some other sink to make sure the source works
> correctly, and then try using your sink.
>
> If you want further debugging, then you will have to dig into the
> StreamingExecution class in Spark, and debug stuff there.
>
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L523
>
> On Wed, Feb 1, 2017 at 3:49 PM, Sam Elamin <hu...@gmail.com>
> wrote:
>
> Yeah sorry Im still working on it, its on a branch you can find here
> <https://github.com/samelamin/spark-bigquery/blob/Stream_reading/src/main/scala/com/samelamin/spark/bigquery/streaming/BigQuerySource.scala>,
> ignore the logging messages I was trying to workout how the APIs work and
> unfortunately because I have to shade the dependency I cant debug it in an
> IDE (that I know of! )
>
> So I can see the correct schema here
> <https://github.com/samelamin/spark-bigquery/blob/Stream_reading/src/main/scala/com/samelamin/spark/bigquery/streaming/BigQuerySource.scala#L64> and
> also when the df is returned(After .load() )
>
> But when that same df has writeStream applied to it, the addBatch
> dataframe has a new schema. Its similar to the old schema but some ints
> have been turned to strings.
>
>
>
> On Wed, Feb 1, 2017 at 11:40 PM, Tathagata Das <
> tathagata.das1565@gmail.com> wrote:
>
> I am assuming that you have written your own BigQuerySource (i dont see
> that code in the link you posted). In that source, you must have
> implemented getBatch which uses offsets to return the Dataframe having the
> data of a batch. Can you double check when this DataFrame returned by
> getBatch, has the expected schema?
>
> On Wed, Feb 1, 2017 at 3:33 PM, Sam Elamin <hu...@gmail.com>
> wrote:
>
> Thanks for the quick response TD!
>
> Ive been trying to identify where exactly this transformation happens
>
> The readStream returns a dataframe with the correct schema
>
> The minute I call writeStream, by the time I get to the addBatch method,
> the dataframe there has an incorrect Schema
>
> So Im skeptical about the issue being prior to the readStream since the
> output dataframe has the correct Schema
>
>
> Am I missing something completely obvious?
>
> Regards
> Sam
>
> On Wed, Feb 1, 2017 at 11:29 PM, Tathagata Das <
> tathagata.das1565@gmail.com> wrote:
>
> You should make sure that schema of the streaming Dataset returned by
> `readStream`, and the schema of the DataFrame returned by the sources
> getBatch.
>
> On Wed, Feb 1, 2017 at 3:25 PM, Sam Elamin <hu...@gmail.com>
> wrote:
>
> Hi All
>
> I am writing a bigquery connector here
> <http://github.com/samelamin/spark-bigquery> and I am getting a strange
> error with schemas being overwritten when a dataframe is passed over to the
> Sink
>
>
> for example the source returns this StructType
> WARN streaming.BigQuerySource:
> StructType(StructField(customerid,LongType,true),
>
> and the sink is recieving this StructType
> WARN streaming.BigQuerySink:
> StructType(StructField(customerid,StringType,true)
>
>
> Any idea why this might be happening?
> I dont have infering schema on
>
> spark.conf.set("spark.sql.streaming.schemaInference", "false")
>
> I know its off by default but I set it just to be sure
>
> So completely lost to what could be causing this
>
> Regards
> Sam
>
>
>
>
>
>
>
>
>

Re: Structured Streaming Schema Issue

Posted by Sam Elamin <hu...@gmail.com>.
Hi All

Ive done a bit more digging to where exactly this happens. It seems like
the schema is infered again after the data leaves the source and then comes
into the sink

Below is a stack trace, the schema at the BigQuerySource has a LongType for
customer id but then at the sink, the data received has an incorrect schema

where exactly is the data stored in between these steps? Shouldnt the sink
call the Schema method from Source?

If it helps I want to clarify that I am not passing in the schema when i
initialise the readStream, but I am overriding the sourceSchema in my class
that extends StreamSourceProvider. But even when that method is called the
schema is correct

p.s Ignore the "production" bucket, thats just a storage bucket I am using,
not actually using structured streaming in production just yet :)


17/02/02 14:57:22 WARN streaming.BigQuerySource:
StructType(StructField(customerid,LongType,true),
StructField(id,StringType,true), StructField(legacyorderid,LongType,true),
StructField(notifiycustomer,BooleanType,true), StructField(ordercontainer,
StructType(StructField(applicationinfo,StructType(
StructField(applicationname,StringType,true), StructField(
applicationversion,StringType,true), StructField(clientip,StringType,true),
StructField(jefeature,StringType,true),
StructField(useragent,StringType,true)),true),
StructField(basketinfo,StructType(StructField(basketid,StringType,true),
StructField(deliverycharge,FloatType,true),
StructField(discount,FloatType,true),
StructField(discounts,StringType,true), StructField(items,StructType(
StructField(combinedprice,FloatType,true),
StructField(description,StringType,true),
StructField(discounts,StringType,true), StructField(mealparts,StringType,true),
StructField(menucardnumber,StringType,true),
StructField(multibuydiscounts,StringType,true),
StructField(name,StringType,true),
StructField(optionalaccessories,StringType,true),
StructField(productid,LongType,true), StructField(producttypeid,LongType,true),
StructField(requiredaccessories,StructType(StructField(groupid,LongType,true),
StructField(name,StringType,true),
StructField(requiredaccessoryid,LongType,true),
StructField(unitprice,FloatType,true)),true),
StructField(synonym,StringType,true),
StructField(unitprice,FloatType,true)),true),
StructField(menuid,LongType,true),
StructField(multibuydiscount,FloatType,true),
StructField(subtotal,FloatType,true),
StructField(tospend,FloatType,true), StructField(total,FloatType,true)),true),
StructField(customerinfo,StructType(StructField(address,StringType,true),
StructField(city,StringType,true), StructField(email,StringType,true),
StructField(id,StringType,true), StructField(name,StringType,true),
StructField(phonenumber,StringType,true),
StructField(postcode,StringType,true),
StructField(previousjeordercount,LongType,true), StructField(
previousrestuarantordercount,LongType,true),
StructField(timezone,StringType,true)),true),
StructField(id,StringType,true), StructField(islocked,BooleanType,true),
StructField(legacyid,LongType,true), StructField(order,StructType(
StructField(duedate,StringType,true),
StructField(duedatewithutcoffset,StringType,true),
StructField(initialduedate,StringType,true), StructField(
initialduedatewithutcoffset,StringType,true),
StructField(notetorestaurant,StringType,true),
StructField(orderable,BooleanType,true),
StructField(placeddate,StringType,true),
StructField(promptasap,BooleanType,true),
StructField(servicetype,StringType,true)),true),
StructField(paymentinfo,StructType(StructField(
drivertipvalue,FloatType,true), StructField(orderid,StringType,true),
StructField(paiddate,StringType,true), StructField(paymentlines,
StructType(StructField(cardfee,FloatType,true),
StructField(cardtype,StringType,true),
StructField(paymenttransactionref,StringType,true),
StructField(pspname,StringType,true),
StructField(type,StringType,true), StructField(value,FloatType,true)),true),
StructField(total,FloatType,true),
StructField(totalcomplementary,FloatType,true)),true),
StructField(restaurantinfo,StructType(StructField(addresslines,StringType,true),
StructField(city,StringType,true), StructField(dispatchmethod,StringType,true),
StructField(id,StringType,true), StructField(latitude,FloatType,true),
StructField(longitude,FloatType,true), StructField(name,StringType,true),
StructField(offline,BooleanType,true),
StructField(phonenumber,StringType,true),
StructField(postcode,StringType,true), StructField(seoname,StringType,true),
StructField(tempoffline,BooleanType,true)),true)),true),
StructField(orderid,StringType,true),
StructField(orderresolutionstatus,StringType,true),
StructField(raisingcomponent,StringType,true),
StructField(restaurantid,LongType,true),
StructField(tenant,StringType,true), StructField(timestamp,StringType,true))
17/02/02 14:57:23 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0
on 127.0.0.1:51379 in memory (size: 4.8 KB, free: 366.3 MB)
17/02/02 14:57:23 INFO spark.ContextCleaner: Cleaned accumulator 66
17/02/02 14:57:23 INFO codegen.CodeGenerator: Code generated in 194.977984
ms
17/02/02 14:57:23 INFO codegen.CodeGenerator: Code generated in 10.736863 ms
17/02/02 14:57:23 INFO spark.SparkContext: Starting job: start at
EventStreamer.scala:61
17/02/02 14:57:23 INFO scheduler.DAGScheduler: Registering RDD 6 (start at
EventStreamer.scala:61)
17/02/02 14:57:23 INFO scheduler.DAGScheduler: Got job 1 (start at
EventStreamer.scala:61) with 1 output partitions
17/02/02 14:57:23 INFO scheduler.DAGScheduler: Final stage: ResultStage 2
(start at EventStreamer.scala:61)
17/02/02 14:57:23 INFO scheduler.DAGScheduler: Parents of final stage:
List(ShuffleMapStage 1)
17/02/02 14:57:23 INFO scheduler.DAGScheduler: Missing parents:
List(ShuffleMapStage 1)
17/02/02 14:57:23 INFO scheduler.DAGScheduler: Submitting ShuffleMapStage 1
(MapPartitionsRDD[6] at start at EventStreamer.scala:61), which has no
missing parents
17/02/02 14:57:23 INFO memory.MemoryStore: Block broadcast_2 stored as
values in memory (estimated size 17.3 KB, free 366.0 MB)
17/02/02 14:57:23 INFO memory.MemoryStore: Block broadcast_2_piece0 stored
as bytes in memory (estimated size 9.6 KB, free 366.0 MB)
17/02/02 14:57:23 INFO storage.BlockManagerInfo: Added broadcast_2_piece0
in memory on 127.0.0.1:51379 (size: 9.6 KB, free: 366.3 MB)
17/02/02 14:57:23 INFO spark.SparkContext: Created broadcast 2 from
broadcast at DAGScheduler.scala:1012
17/02/02 14:57:23 INFO scheduler.DAGScheduler: Submitting 2 missing tasks
from ShuffleMapStage 1 (MapPartitionsRDD[6] at start at
EventStreamer.scala:61)
17/02/02 14:57:23 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0
with 2 tasks
17/02/02 14:57:23 INFO scheduler.TaskSetManager: Starting task 0.0 in stage
1.0 (TID 2, localhost, partition 0, PROCESS_LOCAL, 6444 bytes)
17/02/02 14:57:23 INFO scheduler.TaskSetManager: Starting task 1.0 in stage
1.0 (TID 3, localhost, partition 1, PROCESS_LOCAL, 6444 bytes)
17/02/02 14:57:23 INFO executor.Executor: Running task 0.0 in stage 1.0
(TID 2)
17/02/02 14:57:23 INFO executor.Executor: Running task 1.0 in stage 1.0
(TID 3)
17/02/02 14:57:23 INFO rdd.NewHadoopRDD: Input split:
gs://je-bi-production/hadoop/tmp/bigquery/job_201702021456_0000/shard-1/data-*.json[3
estimated records]
17/02/02 14:57:23 INFO rdd.NewHadoopRDD: Input split:
gs://je-bi-production/hadoop/tmp/bigquery/job_201702021456_0000/shard-0/data-*.json[3
estimated records]
17/02/02 14:57:23 INFO bigquery.DynamicFileListRecordReader: Initializing
DynamicFileListRecordReader with split 'InputSplit:: length:3 locations: []
toString(): gs://je-bi-production/hadoop/tmp/bigquery/job_201702021456_0000/shard-1/data-*.json[3
estimated records]', task context 'TaskAttemptContext::
TaskAttemptID:attempt_201702021456_0000_m_000001_0 Status:'
17/02/02 14:57:23 INFO bigquery.DynamicFileListRecordReader: Initializing
DynamicFileListRecordReader with split 'InputSplit:: length:3 locations: []
toString(): gs://je-bi-production/hadoop/tmp/bigquery/job_201702021456_0000/shard-0/data-*.json[3
estimated records]', task context 'TaskAttemptContext::
TaskAttemptID:attempt_201702021456_0000_m_000000_0 Status:'
17/02/02 14:57:23 INFO codegen.CodeGenerator: Code generated in 92.93187 ms
17/02/02 14:57:23 INFO bigquery.DynamicFileListRecordReader: Adding new
file 'data-000000000000.json' of size 0 to knownFileSet.
17/02/02 14:57:23 INFO bigquery.DynamicFileListRecordReader: Moving to next
file 'gs://je-bi-production/hadoop/tmp/bigquery/job_201702021456_
0000/shard-1/data-000000000000.json' which has 0 bytes. Records read so
far: 0
17/02/02 14:57:23 INFO bigquery.DynamicFileListRecordReader: Adding new
file 'data-000000000000.json' of size 20424 to knownFileSet.
17/02/02 14:57:23 INFO bigquery.DynamicFileListRecordReader: Adding new
file 'data-000000000001.json' of size 0 to knownFileSet.
17/02/02 14:57:23 INFO bigquery.DynamicFileListRecordReader: Moving to next
file 'gs://je-bi-production/hadoop/tmp/bigquery/job_201702021456_
0000/shard-0/data-000000000000.json' which has 20424 bytes. Records read so
far: 0
17/02/02 14:57:24 INFO gcsio.GoogleCloudStorageReadChannel: Got 'range not
satisfiable' for reading gs://je-bi-production/hadoop/
tmp/bigquery/job_201702021456_0000/shard-1/data-000000000000.json at
position 0; assuming empty.
17/02/02 14:57:24 INFO bigquery.DynamicFileListRecordReader: Found
end-marker file 'data-000000000000.json' with index 0
17/02/02 14:57:24 INFO executor.Executor: Finished task 1.0 in stage 1.0
(TID 3). 1723 bytes result sent to driver
17/02/02 14:57:24 INFO scheduler.TaskSetManager: Finished task 1.0 in stage
1.0 (TID 3) in 844 ms on localhost (1/2)
17/02/02 14:57:24 INFO bigquery.DynamicFileListRecordReader: Moving to next
file 'gs://je-bi-production/hadoop/tmp/bigquery/job_201702021456_
0000/shard-0/data-000000000001.json' which has 0 bytes. Records read so
far: 6
17/02/02 14:57:24 INFO gcsio.GoogleCloudStorageReadChannel: Got 'range not
satisfiable' for reading gs://je-bi-production/hadoop/
tmp/bigquery/job_201702021456_0000/shard-0/data-000000000001.json at
position 0; assuming empty.
17/02/02 14:57:24 INFO bigquery.DynamicFileListRecordReader: Found
end-marker file 'data-000000000001.json' with index 1
17/02/02 14:57:24 INFO executor.Executor: Finished task 0.0 in stage 1.0
(TID 2). 1717 bytes result sent to driver
17/02/02 14:57:24 INFO scheduler.TaskSetManager: Finished task 0.0 in stage
1.0 (TID 2) in 1344 ms on localhost (2/2)
17/02/02 14:57:24 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0,
whose tasks have all completed, from pool
17/02/02 14:57:24 INFO scheduler.DAGScheduler: ShuffleMapStage 1 (start at
EventStreamer.scala:61) finished in 1.344 s
17/02/02 14:57:24 INFO scheduler.DAGScheduler: looking for newly runnable
stages
17/02/02 14:57:24 INFO scheduler.DAGScheduler: running: Set()
17/02/02 14:57:24 INFO scheduler.DAGScheduler: waiting: Set(ResultStage 2)
17/02/02 14:57:24 INFO scheduler.DAGScheduler: failed: Set()
17/02/02 14:57:24 INFO scheduler.DAGScheduler: Submitting ResultStage 2
(MapPartitionsRDD[9] at start at EventStreamer.scala:61), which has no
missing parents
17/02/02 14:57:24 INFO memory.MemoryStore: Block broadcast_3 stored as
values in memory (estimated size 7.0 KB, free 366.0 MB)
17/02/02 14:57:24 INFO memory.MemoryStore: Block broadcast_3_piece0 stored
as bytes in memory (estimated size 3.7 KB, free 366.0 MB)
17/02/02 14:57:24 INFO storage.BlockManagerInfo: Added broadcast_3_piece0
in memory on 127.0.0.1:51379 (size: 3.7 KB, free: 366.3 MB)
17/02/02 14:57:24 INFO spark.SparkContext: Created broadcast 3 from
broadcast at DAGScheduler.scala:1012
17/02/02 14:57:24 INFO scheduler.DAGScheduler: Submitting 1 missing tasks
from ResultStage 2 (MapPartitionsRDD[9] at start at EventStreamer.scala:61)
17/02/02 14:57:24 INFO scheduler.TaskSchedulerImpl: Adding task set 2.0
with 1 tasks
17/02/02 14:57:24 INFO scheduler.TaskSetManager: Starting task 0.0 in stage
2.0 (TID 4, localhost, partition 0, ANY, 6233 bytes)
17/02/02 14:57:24 INFO executor.Executor: Running task 0.0 in stage 2.0
(TID 4)
17/02/02 14:57:24 INFO storage.ShuffleBlockFetcherIterator: Getting 2
non-empty blocks out of 2 blocks
17/02/02 14:57:24 INFO storage.ShuffleBlockFetcherIterator: Started 0
remote fetches in 5 ms
17/02/02 14:57:24 INFO executor.Executor: Finished task 0.0 in stage 2.0
(TID 4). 1873 bytes result sent to driver
17/02/02 14:57:24 INFO scheduler.TaskSetManager: Finished task 0.0 in stage
2.0 (TID 4) in 37 ms on localhost (1/1)
17/02/02 14:57:24 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 2.0,
whose tasks have all completed, from pool
17/02/02 14:57:24 INFO scheduler.DAGScheduler: ResultStage 2 (start at
EventStreamer.scala:61) finished in 0.037 s
17/02/02 14:57:24 INFO scheduler.DAGScheduler: Job 1 finished: start at
EventStreamer.scala:61, took 1.420357 s
17/02/02 14:57:24 INFO codegen.CodeGenerator: Code generated in 7.668964 ms
17/02/02 14:57:24 WARN streaming.BigQuerySink: ************ saving schema
is set to
17/02/02 14:57:24 WARN streaming.BigQuerySink:
StructType(StructField(customerid,StringType,true),
StructField(id,StringType,true), StructField(legacyorderid,StringType,true),
StructField(notifiycustomer,BooleanType,true), StructField(ordercontainer,
StructType(StructField(applicationinfo,StructType(
StructField(applicationname,StringType,true), StructField(
applicationversion,StringType,true), StructField(clientip,StringType,true),
StructField(jefeature,StringType,true),
StructField(useragent,StringType,true)),true),
StructField(basketinfo,StructType(StructField(basketid,StringType,true),
StructField(deliverycharge,LongType,true), StructField(discount,LongType,true),
StructField(discounts,ArrayType(StringType,true),true),
StructField(items,ArrayType(StructType(StructField(
combinedprice,DoubleType,true), StructField(description,StringType,true),
StructField(discounts,ArrayType(StringType,true),true),
StructField(mealparts,ArrayType(StringType,true),true),
StructField(menucardnumber,StringType,true), StructField(multibuydiscounts,
ArrayType(StringType,true),true), StructField(name,StringType,true),
StructField(optionalaccessories,ArrayType(StringType,true),true),
StructField(productid,StringType,true),
StructField(producttypeid,StringType,true),
StructField(requiredaccessories,ArrayType(StructType(StructField(groupid,StringType,true),
StructField(name,StringType,true),
StructField(requiredaccessoryid,StringType,true),
StructField(unitprice,LongType,true)),true),true),
StructField(synonym,StringType,true),
StructField(unitprice,DoubleType,true)),true),true),
StructField(menuid,StringType,true),
StructField(multibuydiscount,LongType,true),
StructField(subtotal,DoubleType,true), StructField(tospend,LongType,true),
StructField(total,DoubleType,true)),true), StructField(customerinfo,
StructType(StructField(address,StringType,true),
StructField(city,StringType,true), StructField(email,StringType,true),
StructField(id,StringType,true), StructField(name,StringType,true),
StructField(phonenumber,StringType,true),
StructField(postcode,StringType,true),
StructField(previousjeordercount,StringType,true), StructField(
previousrestuarantordercount,StringType,true),
StructField(timezone,StringType,true)),true),
StructField(id,StringType,true), StructField(islocked,BooleanType,true),
StructField(legacyid,StringType,true), StructField(order,StructType(
StructField(duedate,StringType,true),
StructField(duedatewithutcoffset,StringType,true),
StructField(initialduedate,StringType,true), StructField(
initialduedatewithutcoffset,StringType,true),
StructField(notetorestaurant,StringType,true),
StructField(orderable,BooleanType,true),
StructField(placeddate,StringType,true),
StructField(promptasap,BooleanType,true),
StructField(servicetype,StringType,true)),true),
StructField(paymentinfo,StructType(StructField(drivertipvalue,LongType,true),
StructField(orderid,StringType,true), StructField(paiddate,StringType,true),
StructField(paymentlines,ArrayType(StructType(StructField(cardfee,DoubleType,true),
StructField(cardtype,StringType,true),
StructField(paymenttransactionref,StringType,true),
StructField(pspname,StringType,true), StructField(type,StringType,true),
StructField(value,DoubleType,true)),true),true),
StructField(total,DoubleType,true),
StructField(totalcomplementary,LongType,true)),true),
StructField(restaurantinfo,StructType(StructField(addresslines,ArrayType(StringType,true),true),
StructField(city,StringType,true), StructField(dispatchmethod,StringType,true),
StructField(id,StringType,true), StructField(latitude,DoubleType,true),
StructField(longitude,DoubleType,true), StructField(name,StringType,true),
StructField(offline,BooleanType,true),
StructField(phonenumber,StringType,true),
StructField(postcode,StringType,true), StructField(seoname,StringType,true),
StructField(tempoffline,BooleanType,true)),true)),true),
StructField(orderid,StringType,true),
StructField(orderresolutionstatus,StringType,true),
StructField(raisingcomponent,StringType,true),
StructField(restaurantid,StringType,true),
StructField(tenant,StringType,true), StructField(timestamp,StringType,true))


On Thu, Feb 2, 2017 at 12:04 AM, Sam Elamin <hu...@gmail.com> wrote:

> There isn't a query per se.im writing the entire dataframe from the
> output of the read stream. Once I got that working I was planning to test
> the query aspect
>
>
> I'll do a bit more digging. Thank you very much for your help. Structued
> streaming is very exciting and I really am enjoying writing a connector for
> it!
>
> Regards
> Sam
> On Thu, 2 Feb 2017 at 00:02, Tathagata Das <ta...@gmail.com>
> wrote:
>
>> What is the query you are apply writeStream on? Essentially can you print
>> the whole query.
>>
>> Also, you can do StreamingQuery.explain() to see in full details how the
>> logical plan changes to physical plan, for a batch of data. that might
>> help. try doing that with some other sink to make sure the source works
>> correctly, and then try using your sink.
>>
>> If you want further debugging, then you will have to dig into the
>> StreamingExecution class in Spark, and debug stuff there.
>> https://github.com/apache/spark/blob/master/sql/core/src/
>> main/scala/org/apache/spark/sql/execution/streaming/Stream
>> Execution.scala#L523
>>
>> On Wed, Feb 1, 2017 at 3:49 PM, Sam Elamin <hu...@gmail.com>
>> wrote:
>>
>> Yeah sorry Im still working on it, its on a branch you can find here
>> <https://github.com/samelamin/spark-bigquery/blob/Stream_reading/src/main/scala/com/samelamin/spark/bigquery/streaming/BigQuerySource.scala>,
>> ignore the logging messages I was trying to workout how the APIs work and
>> unfortunately because I have to shade the dependency I cant debug it in an
>> IDE (that I know of! )
>>
>> So I can see the correct schema here
>> <https://github.com/samelamin/spark-bigquery/blob/Stream_reading/src/main/scala/com/samelamin/spark/bigquery/streaming/BigQuerySource.scala#L64> and
>> also when the df is returned(After .load() )
>>
>> But when that same df has writeStream applied to it, the addBatch
>> dataframe has a new schema. Its similar to the old schema but some ints
>> have been turned to strings.
>>
>>
>>
>> On Wed, Feb 1, 2017 at 11:40 PM, Tathagata Das <
>> tathagata.das1565@gmail.com> wrote:
>>
>> I am assuming that you have written your own BigQuerySource (i dont see
>> that code in the link you posted). In that source, you must have
>> implemented getBatch which uses offsets to return the Dataframe having the
>> data of a batch. Can you double check when this DataFrame returned by
>> getBatch, has the expected schema?
>>
>> On Wed, Feb 1, 2017 at 3:33 PM, Sam Elamin <hu...@gmail.com>
>> wrote:
>>
>> Thanks for the quick response TD!
>>
>> Ive been trying to identify where exactly this transformation happens
>>
>> The readStream returns a dataframe with the correct schema
>>
>> The minute I call writeStream, by the time I get to the addBatch method,
>> the dataframe there has an incorrect Schema
>>
>> So Im skeptical about the issue being prior to the readStream since the
>> output dataframe has the correct Schema
>>
>>
>> Am I missing something completely obvious?
>>
>> Regards
>> Sam
>>
>> On Wed, Feb 1, 2017 at 11:29 PM, Tathagata Das <
>> tathagata.das1565@gmail.com> wrote:
>>
>> You should make sure that schema of the streaming Dataset returned by
>> `readStream`, and the schema of the DataFrame returned by the sources
>> getBatch.
>>
>> On Wed, Feb 1, 2017 at 3:25 PM, Sam Elamin <hu...@gmail.com>
>> wrote:
>>
>> Hi All
>>
>> I am writing a bigquery connector here
>> <http://github.com/samelamin/spark-bigquery> and I am getting a strange
>> error with schemas being overwritten when a dataframe is passed over to the
>> Sink
>>
>>
>> for example the source returns this StructType
>> WARN streaming.BigQuerySource: StructType(StructField(custome
>> rid,LongType,true),
>>
>> and the sink is recieving this StructType
>> WARN streaming.BigQuerySink: StructType(StructField(custome
>> rid,StringType,true)
>>
>>
>> Any idea why this might be happening?
>> I dont have infering schema on
>>
>> spark.conf.set("spark.sql.streaming.schemaInference", "false")
>>
>> I know its off by default but I set it just to be sure
>>
>> So completely lost to what could be causing this
>>
>> Regards
>> Sam
>>
>>
>>
>>
>>
>>
>>

Re: Structured Streaming Schema Issue

Posted by Sam Elamin <hu...@gmail.com>.
There isn't a query per se.im writing the entire dataframe from the output
of the read stream. Once I got that working I was planning to test the
query aspect


I'll do a bit more digging. Thank you very much for your help. Structued
streaming is very exciting and I really am enjoying writing a connector for
it!

Regards
Sam
On Thu, 2 Feb 2017 at 00:02, Tathagata Das <ta...@gmail.com>
wrote:

> What is the query you are apply writeStream on? Essentially can you print
> the whole query.
>
> Also, you can do StreamingQuery.explain() to see in full details how the
> logical plan changes to physical plan, for a batch of data. that might
> help. try doing that with some other sink to make sure the source works
> correctly, and then try using your sink.
>
> If you want further debugging, then you will have to dig into the
> StreamingExecution class in Spark, and debug stuff there.
>
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L523
>
> On Wed, Feb 1, 2017 at 3:49 PM, Sam Elamin <hu...@gmail.com>
> wrote:
>
> Yeah sorry Im still working on it, its on a branch you can find here
> <https://github.com/samelamin/spark-bigquery/blob/Stream_reading/src/main/scala/com/samelamin/spark/bigquery/streaming/BigQuerySource.scala>,
> ignore the logging messages I was trying to workout how the APIs work and
> unfortunately because I have to shade the dependency I cant debug it in an
> IDE (that I know of! )
>
> So I can see the correct schema here
> <https://github.com/samelamin/spark-bigquery/blob/Stream_reading/src/main/scala/com/samelamin/spark/bigquery/streaming/BigQuerySource.scala#L64> and
> also when the df is returned(After .load() )
>
> But when that same df has writeStream applied to it, the addBatch
> dataframe has a new schema. Its similar to the old schema but some ints
> have been turned to strings.
>
>
>
> On Wed, Feb 1, 2017 at 11:40 PM, Tathagata Das <
> tathagata.das1565@gmail.com> wrote:
>
> I am assuming that you have written your own BigQuerySource (i dont see
> that code in the link you posted). In that source, you must have
> implemented getBatch which uses offsets to return the Dataframe having the
> data of a batch. Can you double check when this DataFrame returned by
> getBatch, has the expected schema?
>
> On Wed, Feb 1, 2017 at 3:33 PM, Sam Elamin <hu...@gmail.com>
> wrote:
>
> Thanks for the quick response TD!
>
> Ive been trying to identify where exactly this transformation happens
>
> The readStream returns a dataframe with the correct schema
>
> The minute I call writeStream, by the time I get to the addBatch method,
> the dataframe there has an incorrect Schema
>
> So Im skeptical about the issue being prior to the readStream since the
> output dataframe has the correct Schema
>
>
> Am I missing something completely obvious?
>
> Regards
> Sam
>
> On Wed, Feb 1, 2017 at 11:29 PM, Tathagata Das <
> tathagata.das1565@gmail.com> wrote:
>
> You should make sure that schema of the streaming Dataset returned by
> `readStream`, and the schema of the DataFrame returned by the sources
> getBatch.
>
> On Wed, Feb 1, 2017 at 3:25 PM, Sam Elamin <hu...@gmail.com>
> wrote:
>
> Hi All
>
> I am writing a bigquery connector here
> <http://github.com/samelamin/spark-bigquery> and I am getting a strange
> error with schemas being overwritten when a dataframe is passed over to the
> Sink
>
>
> for example the source returns this StructType
> WARN streaming.BigQuerySource:
> StructType(StructField(customerid,LongType,true),
>
> and the sink is recieving this StructType
> WARN streaming.BigQuerySink:
> StructType(StructField(customerid,StringType,true)
>
>
> Any idea why this might be happening?
> I dont have infering schema on
>
> spark.conf.set("spark.sql.streaming.schemaInference", "false")
>
> I know its off by default but I set it just to be sure
>
> So completely lost to what could be causing this
>
> Regards
> Sam
>
>
>
>
>
>
>

Re: Structured Streaming Schema Issue

Posted by Tathagata Das <ta...@gmail.com>.
What is the query you are apply writeStream on? Essentially can you print
the whole query.

Also, you can do StreamingQuery.explain() to see in full details how the
logical plan changes to physical plan, for a batch of data. that might
help. try doing that with some other sink to make sure the source works
correctly, and then try using your sink.

If you want further debugging, then you will have to dig into the
StreamingExecution class in Spark, and debug stuff there.
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L523

On Wed, Feb 1, 2017 at 3:49 PM, Sam Elamin <hu...@gmail.com> wrote:

> Yeah sorry Im still working on it, its on a branch you can find here
> <https://github.com/samelamin/spark-bigquery/blob/Stream_reading/src/main/scala/com/samelamin/spark/bigquery/streaming/BigQuerySource.scala>,
> ignore the logging messages I was trying to workout how the APIs work and
> unfortunately because I have to shade the dependency I cant debug it in an
> IDE (that I know of! )
>
> So I can see the correct schema here
> <https://github.com/samelamin/spark-bigquery/blob/Stream_reading/src/main/scala/com/samelamin/spark/bigquery/streaming/BigQuerySource.scala#L64> and
> also when the df is returned(After .load() )
>
> But when that same df has writeStream applied to it, the addBatch
> dataframe has a new schema. Its similar to the old schema but some ints
> have been turned to strings.
>
>
>
> On Wed, Feb 1, 2017 at 11:40 PM, Tathagata Das <
> tathagata.das1565@gmail.com> wrote:
>
>> I am assuming that you have written your own BigQuerySource (i dont see
>> that code in the link you posted). In that source, you must have
>> implemented getBatch which uses offsets to return the Dataframe having the
>> data of a batch. Can you double check when this DataFrame returned by
>> getBatch, has the expected schema?
>>
>> On Wed, Feb 1, 2017 at 3:33 PM, Sam Elamin <hu...@gmail.com>
>> wrote:
>>
>>> Thanks for the quick response TD!
>>>
>>> Ive been trying to identify where exactly this transformation happens
>>>
>>> The readStream returns a dataframe with the correct schema
>>>
>>> The minute I call writeStream, by the time I get to the addBatch method,
>>> the dataframe there has an incorrect Schema
>>>
>>> So Im skeptical about the issue being prior to the readStream since the
>>> output dataframe has the correct Schema
>>>
>>>
>>> Am I missing something completely obvious?
>>>
>>> Regards
>>> Sam
>>>
>>> On Wed, Feb 1, 2017 at 11:29 PM, Tathagata Das <
>>> tathagata.das1565@gmail.com> wrote:
>>>
>>>> You should make sure that schema of the streaming Dataset returned by
>>>> `readStream`, and the schema of the DataFrame returned by the sources
>>>> getBatch.
>>>>
>>>> On Wed, Feb 1, 2017 at 3:25 PM, Sam Elamin <hu...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi All
>>>>>
>>>>> I am writing a bigquery connector here
>>>>> <http://github.com/samelamin/spark-bigquery> and I am getting a
>>>>> strange error with schemas being overwritten when a dataframe is passed
>>>>> over to the Sink
>>>>>
>>>>>
>>>>> for example the source returns this StructType
>>>>> WARN streaming.BigQuerySource: StructType(StructField(custome
>>>>> rid,LongType,true),
>>>>>
>>>>> and the sink is recieving this StructType
>>>>> WARN streaming.BigQuerySink: StructType(StructField(custome
>>>>> rid,StringType,true)
>>>>>
>>>>>
>>>>> Any idea why this might be happening?
>>>>> I dont have infering schema on
>>>>>
>>>>> spark.conf.set("spark.sql.streaming.schemaInference", "false")
>>>>>
>>>>> I know its off by default but I set it just to be sure
>>>>>
>>>>> So completely lost to what could be causing this
>>>>>
>>>>> Regards
>>>>> Sam
>>>>>
>>>>
>>>>
>>>
>>
>

Re: Structured Streaming Schema Issue

Posted by Sam Elamin <hu...@gmail.com>.
Yeah sorry Im still working on it, its on a branch you can find here
<https://github.com/samelamin/spark-bigquery/blob/Stream_reading/src/main/scala/com/samelamin/spark/bigquery/streaming/BigQuerySource.scala>,
ignore the logging messages I was trying to workout how the APIs work and
unfortunately because I have to shade the dependency I cant debug it in an
IDE (that I know of! )

So I can see the correct schema here
<https://github.com/samelamin/spark-bigquery/blob/Stream_reading/src/main/scala/com/samelamin/spark/bigquery/streaming/BigQuerySource.scala#L64>
and
also when the df is returned(After .load() )

But when that same df has writeStream applied to it, the addBatch dataframe
has a new schema. Its similar to the old schema but some ints have been
turned to strings.



On Wed, Feb 1, 2017 at 11:40 PM, Tathagata Das <ta...@gmail.com>
wrote:

> I am assuming that you have written your own BigQuerySource (i dont see
> that code in the link you posted). In that source, you must have
> implemented getBatch which uses offsets to return the Dataframe having the
> data of a batch. Can you double check when this DataFrame returned by
> getBatch, has the expected schema?
>
> On Wed, Feb 1, 2017 at 3:33 PM, Sam Elamin <hu...@gmail.com>
> wrote:
>
>> Thanks for the quick response TD!
>>
>> Ive been trying to identify where exactly this transformation happens
>>
>> The readStream returns a dataframe with the correct schema
>>
>> The minute I call writeStream, by the time I get to the addBatch method,
>> the dataframe there has an incorrect Schema
>>
>> So Im skeptical about the issue being prior to the readStream since the
>> output dataframe has the correct Schema
>>
>>
>> Am I missing something completely obvious?
>>
>> Regards
>> Sam
>>
>> On Wed, Feb 1, 2017 at 11:29 PM, Tathagata Das <
>> tathagata.das1565@gmail.com> wrote:
>>
>>> You should make sure that schema of the streaming Dataset returned by
>>> `readStream`, and the schema of the DataFrame returned by the sources
>>> getBatch.
>>>
>>> On Wed, Feb 1, 2017 at 3:25 PM, Sam Elamin <hu...@gmail.com>
>>> wrote:
>>>
>>>> Hi All
>>>>
>>>> I am writing a bigquery connector here
>>>> <http://github.com/samelamin/spark-bigquery> and I am getting a
>>>> strange error with schemas being overwritten when a dataframe is passed
>>>> over to the Sink
>>>>
>>>>
>>>> for example the source returns this StructType
>>>> WARN streaming.BigQuerySource: StructType(StructField(custome
>>>> rid,LongType,true),
>>>>
>>>> and the sink is recieving this StructType
>>>> WARN streaming.BigQuerySink: StructType(StructField(custome
>>>> rid,StringType,true)
>>>>
>>>>
>>>> Any idea why this might be happening?
>>>> I dont have infering schema on
>>>>
>>>> spark.conf.set("spark.sql.streaming.schemaInference", "false")
>>>>
>>>> I know its off by default but I set it just to be sure
>>>>
>>>> So completely lost to what could be causing this
>>>>
>>>> Regards
>>>> Sam
>>>>
>>>
>>>
>>
>

Re: Structured Streaming Schema Issue

Posted by Tathagata Das <ta...@gmail.com>.
I am assuming that you have written your own BigQuerySource (i dont see
that code in the link you posted). In that source, you must have
implemented getBatch which uses offsets to return the Dataframe having the
data of a batch. Can you double check when this DataFrame returned by
getBatch, has the expected schema?

On Wed, Feb 1, 2017 at 3:33 PM, Sam Elamin <hu...@gmail.com> wrote:

> Thanks for the quick response TD!
>
> Ive been trying to identify where exactly this transformation happens
>
> The readStream returns a dataframe with the correct schema
>
> The minute I call writeStream, by the time I get to the addBatch method,
> the dataframe there has an incorrect Schema
>
> So Im skeptical about the issue being prior to the readStream since the
> output dataframe has the correct Schema
>
>
> Am I missing something completely obvious?
>
> Regards
> Sam
>
> On Wed, Feb 1, 2017 at 11:29 PM, Tathagata Das <
> tathagata.das1565@gmail.com> wrote:
>
>> You should make sure that schema of the streaming Dataset returned by
>> `readStream`, and the schema of the DataFrame returned by the sources
>> getBatch.
>>
>> On Wed, Feb 1, 2017 at 3:25 PM, Sam Elamin <hu...@gmail.com>
>> wrote:
>>
>>> Hi All
>>>
>>> I am writing a bigquery connector here
>>> <http://github.com/samelamin/spark-bigquery> and I am getting a strange
>>> error with schemas being overwritten when a dataframe is passed over to the
>>> Sink
>>>
>>>
>>> for example the source returns this StructType
>>> WARN streaming.BigQuerySource: StructType(StructField(custome
>>> rid,LongType,true),
>>>
>>> and the sink is recieving this StructType
>>> WARN streaming.BigQuerySink: StructType(StructField(custome
>>> rid,StringType,true)
>>>
>>>
>>> Any idea why this might be happening?
>>> I dont have infering schema on
>>>
>>> spark.conf.set("spark.sql.streaming.schemaInference", "false")
>>>
>>> I know its off by default but I set it just to be sure
>>>
>>> So completely lost to what could be causing this
>>>
>>> Regards
>>> Sam
>>>
>>
>>
>

Re: Structured Streaming Schema Issue

Posted by Sam Elamin <hu...@gmail.com>.
Thanks for the quick response TD!

Ive been trying to identify where exactly this transformation happens

The readStream returns a dataframe with the correct schema

The minute I call writeStream, by the time I get to the addBatch method,
the dataframe there has an incorrect Schema

So Im skeptical about the issue being prior to the readStream since the
output dataframe has the correct Schema


Am I missing something completely obvious?

Regards
Sam

On Wed, Feb 1, 2017 at 11:29 PM, Tathagata Das <ta...@gmail.com>
wrote:

> You should make sure that schema of the streaming Dataset returned by
> `readStream`, and the schema of the DataFrame returned by the sources
> getBatch.
>
> On Wed, Feb 1, 2017 at 3:25 PM, Sam Elamin <hu...@gmail.com>
> wrote:
>
>> Hi All
>>
>> I am writing a bigquery connector here
>> <http://github.com/samelamin/spark-bigquery> and I am getting a strange
>> error with schemas being overwritten when a dataframe is passed over to the
>> Sink
>>
>>
>> for example the source returns this StructType
>> WARN streaming.BigQuerySource: StructType(StructField(custome
>> rid,LongType,true),
>>
>> and the sink is recieving this StructType
>> WARN streaming.BigQuerySink: StructType(StructField(custome
>> rid,StringType,true)
>>
>>
>> Any idea why this might be happening?
>> I dont have infering schema on
>>
>> spark.conf.set("spark.sql.streaming.schemaInference", "false")
>>
>> I know its off by default but I set it just to be sure
>>
>> So completely lost to what could be causing this
>>
>> Regards
>> Sam
>>
>
>

Re: Structured Streaming Schema Issue

Posted by Tathagata Das <ta...@gmail.com>.
You should make sure that schema of the streaming Dataset returned by
`readStream`, and the schema of the DataFrame returned by the sources
getBatch.

On Wed, Feb 1, 2017 at 3:25 PM, Sam Elamin <hu...@gmail.com> wrote:

> Hi All
>
> I am writing a bigquery connector here
> <http://github.com/samelamin/spark-bigquery> and I am getting a strange
> error with schemas being overwritten when a dataframe is passed over to the
> Sink
>
>
> for example the source returns this StructType
> WARN streaming.BigQuerySource: StructType(StructField(
> customerid,LongType,true),
>
> and the sink is recieving this StructType
> WARN streaming.BigQuerySink: StructType(StructField(
> customerid,StringType,true)
>
>
> Any idea why this might be happening?
> I dont have infering schema on
>
> spark.conf.set("spark.sql.streaming.schemaInference", "false")
>
> I know its off by default but I set it just to be sure
>
> So completely lost to what could be causing this
>
> Regards
> Sam
>