You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by lsn24 <le...@gmail.com> on 2018/04/13 17:02:31 UTC

Spark parse fixed length file [Java]

Hello,

 We are running into issues while trying to process fixed length files using
spark.

The approach we took is as follows:

1. Read the .bz2 file  into a dataset from hdfs using
spark.read().textFile() API.Create a temporary view.

     Dataset<String> rawDataset = sparkSession.read().textFile(filePath);
     rawDataset.createOrReplaceTempView(tempView);

2. Run a sql query on the view, to slice and dice the data the way we need
it (using substring).

 (SELECT 
                     TRIM(SUBSTRING(value,1 ,16)) AS record1 ,
                     TRIM(SUBSTRING(value,17 ,8)) AS record2 ,
                     TRIM(SUBSTRING(value,25 ,5)) AS record3 ,
                     TRIM(SUBSTRING(value,30 ,16)) AS record4 ,
                     CAST(SUBSTRING(value,46 ,8) AS BIGINT) AS record5 , 
                     CAST(SUBSTRING(value,54 ,6) AS BIGINT) AS record6 , 
                     CAST(SUBSTRING(value,60 ,3) AS BIGINT) AS record7 , 
                     CAST(SUBSTRING(value,63 ,6) AS BIGINT) AS record8 , 
                     TRIM(SUBSTRING(value,69 ,20)) AS record9 ,
                     TRIM(SUBSTRING(value,89 ,40)) AS record10 ,
                     TRIM(SUBSTRING(value,129 ,32)) AS record11 ,
                     TRIM(SUBSTRING(value,161 ,19)) AS record12,
                     TRIM(SUBSTRING(value,180 ,1)) AS record13 ,
                     TRIM(SUBSTRING(value,181 ,9)) AS record14 ,
                     TRIM(SUBSTRING(value,190 ,3)) AS record15 ,
                     CAST(SUBSTRING(value,193 ,8) AS BIGINT) AS record16 , 
                     CAST(SUBSTRING(value,201 ,8) AS BIGINT) AS record17 
                     FROM tempView)

3.Write the output of sql query to a parquet file.
     loadDataset.write().mode(SaveMode.Append).parquet(outputDirectory);

Problem :

  The step #2 takes a longer time , if the length of line is ~2000
characters. If each line in the file is only 1000 characters then it takes
only 4 minutes to process 20 million lines. If we increase the line length
to 2000 characters it takes 20 minutes to process 20 million lines.


Is there a better way in spark to parse fixed length lines?


*Note: *Spark version we use is 2.2.0 and we are using  Spark with Java.




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Spark parse fixed length file [Java]

Posted by lsn24 <le...@gmail.com>.
Thanks for the response JayeshLalwani. Clearly in my case the issue was with
my approach, not with the memory.

The job was taking much longer time even for smaller dataset.

Thanks again!



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Spark parse fixed length file [Java]

Posted by "Lalwani, Jayesh" <Ja...@capitalone.com>.
Is your input data partitioned? How much memory have you assigned to your executor? Have you looked at how much time is being spent in GC in the executor? Is Spark spilling the data into disk?

It is likely that the partition is too big. Spark tries to read the whole partition into the memory of one executor node.  If the partition is too big, it might be causing Spark to run out of memory. One of the side effects of how the JVM does garbage collection is that when applications use too much memory, they just might run very slowly instead of crashing.

If the problem is that the partition is too big, increasing executor memory, or reducing size of partition will do the trick
On 4/13/18, 1:03 PM, "lsn24" <le...@gmail.com> wrote:

    Hello,
    
     We are running into issues while trying to process fixed length files using
    spark.
    
    The approach we took is as follows:
    
    1. Read the .bz2 file  into a dataset from hdfs using
    spark.read().textFile() API.Create a temporary view.
    
         Dataset<String> rawDataset = sparkSession.read().textFile(filePath);
         rawDataset.createOrReplaceTempView(tempView);
    
    2. Run a sql query on the view, to slice and dice the data the way we need
    it (using substring).
    
     (SELECT 
                         TRIM(SUBSTRING(value,1 ,16)) AS record1 ,
                         TRIM(SUBSTRING(value,17 ,8)) AS record2 ,
                         TRIM(SUBSTRING(value,25 ,5)) AS record3 ,
                         TRIM(SUBSTRING(value,30 ,16)) AS record4 ,
                         CAST(SUBSTRING(value,46 ,8) AS BIGINT) AS record5 , 
                         CAST(SUBSTRING(value,54 ,6) AS BIGINT) AS record6 , 
                         CAST(SUBSTRING(value,60 ,3) AS BIGINT) AS record7 , 
                         CAST(SUBSTRING(value,63 ,6) AS BIGINT) AS record8 , 
                         TRIM(SUBSTRING(value,69 ,20)) AS record9 ,
                         TRIM(SUBSTRING(value,89 ,40)) AS record10 ,
                         TRIM(SUBSTRING(value,129 ,32)) AS record11 ,
                         TRIM(SUBSTRING(value,161 ,19)) AS record12,
                         TRIM(SUBSTRING(value,180 ,1)) AS record13 ,
                         TRIM(SUBSTRING(value,181 ,9)) AS record14 ,
                         TRIM(SUBSTRING(value,190 ,3)) AS record15 ,
                         CAST(SUBSTRING(value,193 ,8) AS BIGINT) AS record16 , 
                         CAST(SUBSTRING(value,201 ,8) AS BIGINT) AS record17 
                         FROM tempView)
    
    3.Write the output of sql query to a parquet file.
         loadDataset.write().mode(SaveMode.Append).parquet(outputDirectory);
    
    Problem :
    
      The step #2 takes a longer time , if the length of line is ~2000
    characters. If each line in the file is only 1000 characters then it takes
    only 4 minutes to process 20 million lines. If we increase the line length
    to 2000 characters it takes 20 minutes to process 20 million lines.
    
    
    Is there a better way in spark to parse fixed length lines?
    
    
    *Note: *Spark version we use is 2.2.0 and we are using  Spark with Java.
    
    
    
    
    --
    Sent from: https://urldefense.proofpoint.com/v2/url?u=http-3A__apache-2Dspark-2Duser-2Dlist.1001560.n3.nabble.com_&d=DwICAg&c=pLULRYW__RtkwsQUPxJVDGboCTdgji3AcHNJU0BpTJE&r=F2RNeGILvLdBxn7RJ4effes_QFIiEsoVM2rPi9qX1DKow5HQSjq0_WhIW109SXQ4&m=ORIxa-UsHhr60x9Hbkh4BdKPgKY-bRmOtdbI7O2kaD8&s=McmJKRfol4LsD8u1kSP3gNdK5tMH8fDPvPKKRpOUQ1w&e=
    
    ---------------------------------------------------------------------
    To unsubscribe e-mail: user-unsubscribe@spark.apache.org
    
    

________________________________________________________

The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Spark parse fixed length file [Java]

Posted by lsn24 <le...@gmail.com>.
I was able to solve it by writing a java method (to slice and dice data) and
invoking the method/function from spark.map. This transformed the data way 
faster than my previous approach. 

Thanks geoHeil for the pointer.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Spark parse fixed length file [Java]

Posted by Georg Heiler <ge...@gmail.com>.
I am not 100% sure if spark is smart enough to achieve this using a single
pass over the data. If not you could create a java udf for this which
correctly parses all the columns at once.


Otherwise you could enable Tungsten off heap memory which might speed
things up.
lsn24 <le...@gmail.com> schrieb am Fr. 13. Apr. 2018 um 19:02:

> Hello,
>
>  We are running into issues while trying to process fixed length files
> using
> spark.
>
> The approach we took is as follows:
>
> 1. Read the .bz2 file  into a dataset from hdfs using
> spark.read().textFile() API.Create a temporary view.
>
>      Dataset<String> rawDataset = sparkSession.read().textFile(filePath);
>      rawDataset.createOrReplaceTempView(tempView);
>
> 2. Run a sql query on the view, to slice and dice the data the way we need
> it (using substring).
>
>  (SELECT
>                      TRIM(SUBSTRING(value,1 ,16)) AS record1 ,
>                      TRIM(SUBSTRING(value,17 ,8)) AS record2 ,
>                      TRIM(SUBSTRING(value,25 ,5)) AS record3 ,
>                      TRIM(SUBSTRING(value,30 ,16)) AS record4 ,
>                      CAST(SUBSTRING(value,46 ,8) AS BIGINT) AS record5 ,
>                      CAST(SUBSTRING(value,54 ,6) AS BIGINT) AS record6 ,
>                      CAST(SUBSTRING(value,60 ,3) AS BIGINT) AS record7 ,
>                      CAST(SUBSTRING(value,63 ,6) AS BIGINT) AS record8 ,
>                      TRIM(SUBSTRING(value,69 ,20)) AS record9 ,
>                      TRIM(SUBSTRING(value,89 ,40)) AS record10 ,
>                      TRIM(SUBSTRING(value,129 ,32)) AS record11 ,
>                      TRIM(SUBSTRING(value,161 ,19)) AS record12,
>                      TRIM(SUBSTRING(value,180 ,1)) AS record13 ,
>                      TRIM(SUBSTRING(value,181 ,9)) AS record14 ,
>                      TRIM(SUBSTRING(value,190 ,3)) AS record15 ,
>                      CAST(SUBSTRING(value,193 ,8) AS BIGINT) AS record16 ,
>                      CAST(SUBSTRING(value,201 ,8) AS BIGINT) AS record17
>                      FROM tempView)
>
> 3.Write the output of sql query to a parquet file.
>      loadDataset.write().mode(SaveMode.Append).parquet(outputDirectory);
>
> Problem :
>
>   The step #2 takes a longer time , if the length of line is ~2000
> characters. If each line in the file is only 1000 characters then it takes
> only 4 minutes to process 20 million lines. If we increase the line length
> to 2000 characters it takes 20 minutes to process 20 million lines.
>
>
> Is there a better way in spark to parse fixed length lines?
>
>
> *Note: *Spark version we use is 2.2.0 and we are using  Spark with Java.
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>
>