You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Aniruddha P Tekade <at...@binghamton.edu> on 2019/11/23 01:17:10 UTC
Can spark convert String to Integer when reading using schema in
structured streaming
Hi,
I am new to spark and learning spark structured streaming. I am using
structured streaming with schema specified with the help of case class and
encoders to get the streaming dataframe.
case class SampleLogEntry(
dateTime: Timestamp,
clientIp: String,
userId: String,
operation: String,
bucketName: String,
contAccUsrId: String,
reqHeader: Integer,
reqBody: Integer,
respHeader: Integer,
respBody: Integer,
totalReqResSize: Integer,
duration: Integer,
objectName: String,
httpStatus: Integer,
s3ReqId: String,
etag: String,
errCode: Integer,
srcBucket: String
)
val sampleLogSchema = Encoders.product[SampleLogEntry].schema // using encoders
val rawData = spark
.readStream
.format("")
.option("delimiter", "|")
.option("header", "true")
.schema(sampleLogSchema)
.load("/Users/home/learning-spark/logs")
However, I am getting only null values with this schema -
-------------------------------------------
Batch: 0
-------------------------------------------
+--------+----+------+-----+----------+------------+---------+-------+----------+--------+---------+--------+----------+----------+--------+---------+-------+---------+
|dateTime| IP|userId|s3Api|bucketName|accessUserId|reqHeader|reqBody|respHeader|respBody|totalSize|duration|objectName|httpStatus|reqestId|objectTag|errCode|srcBucket|
+--------+----+------+-----+----------+------------+---------+-------+----------+--------+---------+--------+----------+----------+--------+---------+-------+---------+
| null|null| null| null| null| null| null| null|
null| null| null| null| null| null| null|
null| null| null|
| null|null| null| null| null| null| null| null|
null| null| null| null| null| null| null|
null| null| null|
| null|null| null| null| null| null| null| null|
null| null| null| null| null| null| null|
null| null| null|
| null|null| null| null| null| null| null| null|
null| null| null| null| null| null| null|
null| null| null|
After trying multiple option like getting schema from sample data,
defining schema
structType I changed every field in this schema to String -
case class SampleLogEntry(
dateTime: String,
IP: String,
userId: String,
s3Api: String,
bucketName: String,
accessUserId: String,
reqHeader: String,
reqBody: String,
respHeader: String,
respBody: String,
totalSize: String,
duration: String,
objectName: String,
httpStatus: String,
reqestId: String,
objectTag: String,
errCode: String,
srcBucket: String
)
I am new to spark and streaming. I am using structured streaming with
schema specified with the help of case class and encoders to get the
streaming dataframe.
case class SampleLogEntry(
dateTime: Timestamp,
clientIp: String,
userId: String,
operation: String,
bucketName: String,
contAccUsrId: String,
reqHeader: Integer,
reqBody: Integer,
respHeader: Integer,
respBody: Integer,
totalReqResSize: Integer,
duration: Integer,
objectName: String,
httpStatus: Integer,
s3ReqId: String,
etag: String,
errCode: Integer,
srcBucket: String
)
val sampleLogSchema = Encoders.product[SampleLogEntry].schema // using encoders
val rawData = spark
.readStream
.format("")
.option("delimiter", "|")
.option("header", "true")
.schema(sampleLogSchema)
.load("/Users/home/learning-spark/logs")
However, I am getting only null values with this schema -
-------------------------------------------
Batch: 0
-------------------------------------------
+--------+----+------+-----+----------+------------+---------+-------+----------+--------+---------+--------+----------+----------+--------+---------+-------+---------+
|dateTime| IP|userId|s3Api|bucketName|accessUserId|reqHeader|reqBody|respHeader|respBody|totalSize|duration|objectName|httpStatus|reqestId|objectTag|errCode|srcBucket|
+--------+----+------+-----+----------+------------+---------+-------+----------+--------+---------+--------+----------+----------+--------+---------+-------+---------+
| null|null| null| null| null| null| null| null|
null| null| null| null| null| null| null|
null| null| null|
| null|null| null| null| null| null| null| null|
null| null| null| null| null| null| null|
null| null| null|
| null|null| null| null| null| null| null| null|
null| null| null| null| null| null| null|
null| null| null|
| null|null| null| null| null| null| null| null|
null| null| null| null| null| null| null|
null| null| null|
After trying multiple option like getting schema from sample data,
defining schema
structType I changed every field in this schema to String -
case class SampleLogEntry(
dateTime: String,
IP: String,
userId: String,
s3Api: String,
bucketName: String,
accessUserId: String,
reqHeader: String,
reqBody: String,
respHeader: String,
respBody: String,
totalSize: String,
duration: String,
objectName: String,
httpStatus: String,
reqestId: String,
objectTag: String,
errCode: String,
srcBucket: String
)
gets me the following expected output -
+--------------------+---------+------+-------+----------+------------+---------+-------+----------+--------+---------+--------+----------+----------+--------------------+---------+-----------------+---------+
| dateTime| IP|userId|
s3Api|bucketName|accessUserId|reqHeader|reqBody|respHeader|respBody|totalSize|duration|objectName|httpStatus|
reqestId|objectTag| errCode|srcBucket|
+--------------------+---------+------+-------+----------+------------+---------+-------+----------+--------+---------+--------+----------+----------+--------------------+---------+-----------------+---------+
|2019-07-18 00:00:...|10.29.2.5| null|unknown| null|
null| 0| 0| 0| 250| 250| 247|
null| 400|08084d90-299e-14a...| 0|InvalidBucketName|
null|
|2019-07-18 00:00:...|10.29.2.6| null|unknown| null|
null| 0| 0| 0| 250| 250| 291|
null| 400|08084d92-299e-14a...| 0|InvalidBucketName|
null|
|2019-07-18 00:00:...|10.29.2.5| null|unknown| null|
null| 0| 0| 0| 250| 250| 246|
null| 400|08084d94-299e-14a...| 0|InvalidBucketName|
null|
|2019-07-18 00:00:...|10.29.2.6| null|unknown| null|
null| 0| 0| 0| 250| 250| 227|
null| 400|08084d96-299e-14a...| 0|InvalidBucketName|
null|
|2019-07-18 00:00:...|10.29.2.5| null|unknown| null|
null| 0| 0| 0| 250| 250| 287|
null| 400|08084d98-299e-14a...| 0|InvalidBucketName|
null|
But this is not something I am desiring. Apart from documentation, I am
following a book by Francois Garillot, Gerard Maas titled as Stream
Processing with Apache Spark and found these ways where I can use encoders
to provide schema for scala development. Is there any other way that I can
use to perform this. My logs are pipe | separated records. My log file
records everything in string from source nodes which I can not change.
How do I set up schema so that I can read data from log files with
specified datatypes of the fields? If spark can not change a String to
Integer, what is the workaround?
Best,
Aniruddha
-----------
ᐧ