You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Aniruddha P Tekade <at...@binghamton.edu> on 2019/11/23 01:17:10 UTC

Can spark convert String to Integer when reading using schema in structured streaming

Hi,

I am new to spark and learning spark structured streaming. I am using
structured streaming with schema specified with the help of case class and
encoders to get the streaming dataframe.

case class SampleLogEntry(
                             dateTime: Timestamp,
                             clientIp: String,
                             userId: String,
                             operation: String,
                             bucketName: String,
                             contAccUsrId: String,
                             reqHeader: Integer,
                             reqBody: Integer,
                             respHeader: Integer,
                             respBody: Integer,
                             totalReqResSize: Integer,
                             duration: Integer,
                             objectName: String,
                             httpStatus: Integer,
                             s3ReqId: String,
                             etag: String,
                             errCode: Integer,
                             srcBucket: String
                           )

val sampleLogSchema = Encoders.product[SampleLogEntry].schema // using encoders

val rawData = spark
      .readStream
      .format("")
      .option("delimiter", "|")
      .option("header", "true")
      .schema(sampleLogSchema)
      .load("/Users/home/learning-spark/logs")

However, I am getting only null values with this schema -

-------------------------------------------
Batch: 0
-------------------------------------------
+--------+----+------+-----+----------+------------+---------+-------+----------+--------+---------+--------+----------+----------+--------+---------+-------+---------+
|dateTime|  IP|userId|s3Api|bucketName|accessUserId|reqHeader|reqBody|respHeader|respBody|totalSize|duration|objectName|httpStatus|reqestId|objectTag|errCode|srcBucket|
+--------+----+------+-----+----------+------------+---------+-------+----------+--------+---------+--------+----------+----------+--------+---------+-------+---------+
|    null|null|  null| null|      null|        null|     null|   null|
     null|    null|     null|    null|      null|      null|    null|
   null|   null|     null|
|    null|null|  null| null|      null|        null|     null|   null|
     null|    null|     null|    null|      null|      null|    null|
   null|   null|     null|
|    null|null|  null| null|      null|        null|     null|   null|
     null|    null|     null|    null|      null|      null|    null|
   null|   null|     null|
|    null|null|  null| null|      null|        null|     null|   null|
     null|    null|     null|    null|      null|      null|    null|
   null|   null|     null|

After trying multiple option like getting schema from sample data,
defining schema
structType I changed every field in this schema to String -

case class SampleLogEntry(
                       dateTime: String,
                       IP: String,
                       userId: String,
                       s3Api: String,
                       bucketName: String,
                       accessUserId: String,
                       reqHeader: String,
                       reqBody: String,
                       respHeader: String,
                       respBody: String,
                       totalSize: String,
                       duration: String,
                       objectName: String,
                       httpStatus: String,
                       reqestId: String,
                       objectTag: String,
                       errCode: String,
                       srcBucket: String
                     )


I am new to spark and streaming. I am using structured streaming with
schema specified with the help of case class and encoders to get the
streaming dataframe.

case class SampleLogEntry(
                             dateTime: Timestamp,
                             clientIp: String,
                             userId: String,
                             operation: String,
                             bucketName: String,
                             contAccUsrId: String,
                             reqHeader: Integer,
                             reqBody: Integer,
                             respHeader: Integer,
                             respBody: Integer,
                             totalReqResSize: Integer,
                             duration: Integer,
                             objectName: String,
                             httpStatus: Integer,
                             s3ReqId: String,
                             etag: String,
                             errCode: Integer,
                             srcBucket: String
                           )

val sampleLogSchema = Encoders.product[SampleLogEntry].schema // using encoders

val rawData = spark
      .readStream
      .format("")
      .option("delimiter", "|")
      .option("header", "true")
      .schema(sampleLogSchema)
      .load("/Users/home/learning-spark/logs")

However, I am getting only null values with this schema -

-------------------------------------------
Batch: 0
-------------------------------------------
+--------+----+------+-----+----------+------------+---------+-------+----------+--------+---------+--------+----------+----------+--------+---------+-------+---------+
|dateTime|  IP|userId|s3Api|bucketName|accessUserId|reqHeader|reqBody|respHeader|respBody|totalSize|duration|objectName|httpStatus|reqestId|objectTag|errCode|srcBucket|
+--------+----+------+-----+----------+------------+---------+-------+----------+--------+---------+--------+----------+----------+--------+---------+-------+---------+
|    null|null|  null| null|      null|        null|     null|   null|
     null|    null|     null|    null|      null|      null|    null|
   null|   null|     null|
|    null|null|  null| null|      null|        null|     null|   null|
     null|    null|     null|    null|      null|      null|    null|
   null|   null|     null|
|    null|null|  null| null|      null|        null|     null|   null|
     null|    null|     null|    null|      null|      null|    null|
   null|   null|     null|
|    null|null|  null| null|      null|        null|     null|   null|
     null|    null|     null|    null|      null|      null|    null|
   null|   null|     null|

After trying multiple option like getting schema from sample data,
defining schema
structType I changed every field in this schema to String -

case class SampleLogEntry(
                       dateTime: String,
                       IP: String,
                       userId: String,
                       s3Api: String,
                       bucketName: String,
                       accessUserId: String,
                       reqHeader: String,
                       reqBody: String,
                       respHeader: String,
                       respBody: String,
                       totalSize: String,
                       duration: String,
                       objectName: String,
                       httpStatus: String,
                       reqestId: String,
                       objectTag: String,
                       errCode: String,
                       srcBucket: String
                     )

gets me the following expected output -

+--------------------+---------+------+-------+----------+------------+---------+-------+----------+--------+---------+--------+----------+----------+--------------------+---------+-----------------+---------+
|            dateTime|       IP|userId|
s3Api|bucketName|accessUserId|reqHeader|reqBody|respHeader|respBody|totalSize|duration|objectName|httpStatus|
           reqestId|objectTag|          errCode|srcBucket|
+--------------------+---------+------+-------+----------+------------+---------+-------+----------+--------+---------+--------+----------+----------+--------------------+---------+-----------------+---------+
|2019-07-18 00:00:...|10.29.2.5|  null|unknown|      null|
null|        0|      0|         0|     250|      250|     247|
null|       400|08084d90-299e-14a...|        0|InvalidBucketName|
null|
|2019-07-18 00:00:...|10.29.2.6|  null|unknown|      null|
null|        0|      0|         0|     250|      250|     291|
null|       400|08084d92-299e-14a...|        0|InvalidBucketName|
null|
|2019-07-18 00:00:...|10.29.2.5|  null|unknown|      null|
null|        0|      0|         0|     250|      250|     246|
null|       400|08084d94-299e-14a...|        0|InvalidBucketName|
null|
|2019-07-18 00:00:...|10.29.2.6|  null|unknown|      null|
null|        0|      0|         0|     250|      250|     227|
null|       400|08084d96-299e-14a...|        0|InvalidBucketName|
null|
|2019-07-18 00:00:...|10.29.2.5|  null|unknown|      null|
null|        0|      0|         0|     250|      250|     287|
null|       400|08084d98-299e-14a...|        0|InvalidBucketName|
null|

But this is not something I am desiring. Apart from documentation, I am
following a book by Francois Garillot, Gerard Maas titled as Stream
Processing with Apache Spark and found these ways where I can use encoders
to provide schema for scala development. Is there any other way that I can
use to perform this. My logs are pipe | separated records. My log file
records everything in string from source nodes which I can not change.

How do I set up schema so that I can read data from log files with
specified datatypes of the fields? If spark can not change a String to
Integer, what is the workaround?

Best,
Aniruddha
-----------
ᐧ