You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@arrow.apache.org by "Mahesha Subrahamanya (Jira)" <ji...@apache.org> on 2022/06/16 19:55:00 UTC

[jira] [Comment Edited] (ARROW-16822) Python Error: <>, exitCode: <139> when csv file converting parquet using pandas/pyarrow libraries

    [ https://issues.apache.org/jira/browse/ARROW-16822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17555274#comment-17555274 ] 

Mahesha Subrahamanya edited comment on ARROW-16822 at 6/16/22 7:54 PM:
-----------------------------------------------------------------------

!convertCSV2Parquet.png|width=602,height=354!


was (Author: JIRAUSER290869):
!convertCSV2Parquet.png!

> Python Error: <>, exitCode: <139> when csv file converting parquet using pandas/pyarrow libraries
> -------------------------------------------------------------------------------------------------
>
>                 Key: ARROW-16822
>                 URL: https://issues.apache.org/jira/browse/ARROW-16822
>             Project: Apache Arrow
>          Issue Type: Bug
>          Components: Parquet, Python
>    Affects Versions: 5.0.0
>            Reporter: Mahesha Subrahamanya
>            Priority: Blocker
>         Attachments: convertCSV2Parquet.png
>
>
> Our main requirement is to read source file (structured/semi structured /unstructured) which are residing in AWS s3 through AWS redshift database, where our customer have direct access to analyze the data very quickly/seamlessly for reporting purpose without defining the schema info for the file.
> We have created an data lake (aws s3) workspace where our customers dumps csv/parquet huge size files (like 10/15 GB). We have developed a framework which is consuming pandas/pyarrow (parquet) libraries to read source files in chunking manner and identifying schema meaning (datatype/length) and push it to AWS Glue where AWS redshift database can talk seamlessly to s3 files can read very quickly.
>  
> Following is the snippet of parquet conversion where i'm getting this error. Please take a look
>  
> read_csv_args = \{'filepath_or_buffer': src_object, 'chunksize': self.chunkSizeLimit, 'encoding': 'UTF-8','on_bad_lines': 'error','sep': fileDelimiter, 'low_memory': False, 'skip_blank_lines': True, 'memory_map': True} # 'verbose': True , In order to enable memory consumption logging
>             
> if srcPath.endswith('.gz'):
>                 read_csv_args['compression'] = 'gzip'
>             if fileTextQualifier:
>                 read_csv_args['quotechar'] = fileTextQualifier
> with pd.read_csv(**read_csv_args) as reader:
>                 for chunk_number, chunk in enumerate(reader, 1):
>                     # To support shape-shifting for the incoming datafiles, need to make sure match file with number of columns if not delete
>                     if glueMasterSchema is not None:
>                         sessionSchema=copy.deepcopy(glueMasterSchema) #copying using deepcopy() method
>                         chunk.columns = chunk.columns.str.lower() # modifying the column header of all columns to lowercase
>                         fileSchema = list(chunk.columns)
>                         for key in list(sessionSchema):
>                             if key not in fileSchema:
>                                 del sessionSchema[key]
>                         fields = []
>                         for col,dtypes in sessionSchema.items():
>                             fields.append(pa.field(col, dtypes))
>                         glue_schema = pa.schema(fields)
>                         # To identify the boolean datatype and convert back to STRING which was done during the BF schema
>                         for cols in chunk.columns:
>                             try:
>                                 if chunk[cols].dtype =='bool':
>                                     chunk[cols] = chunk[cols].astype('str')
>                                 if chunk[cols].dtype =='object':
>                                     chunk[cols] = chunk[cols].fillna('').astype('str').tolist()
>                             except (ParserError,ValueError,TypeError):
>                                 pass
>                     log.debug("chunk count", chunk_number, "chunk length", len(chunk), 'glue_schema', glue_schema, 'Wrote file', targetKey)
>                     #log.debug("during pandas chunk data ", chunk,"df schemas:", chunk.dtypes)
>                     table = pa.Table.from_pandas(chunk,  schema=glue_schema , preserve_index=False)
>                     log.info('Glue schema:',glue_schema,'for a [file:',targetKey|file:///',targetKey])
>                     log.info('pandas memory utilization during chunk process: ', chunk.memory_usage().sum(), 'Bytes.','\n\n\n')
>                     # Guess the schema of the CSV file from the first chunk
>                     #if pq_writer is None:
>                     if chunk_number == 1:
>                         #parquet_schema = table.schema
>                         # Open a Parquet file for writing
>                         pq_writer = pq.ParquetWriter(targetKey, schema=glue_schema, compression='snappy') # In PyArrow we use, Snappy generally results in better performance
>                         log.debug("table schema :", pprint.pformat(table.schema).replace('\n', ',').replace('\r', ','),' for:', inputFileName)
>                         # writing the log information into s3://etl_activity
>                         etlActivityLog.append({'tableObjectName': targetDirectory[:-1], 'sourceFileName': inputFileName, 'targetFileName': parquetFileName, 'message': 'File Converted Successfully', 'number of rows processed': str(table.num_rows), 'fileStatus': 'SUCCESS'})
>                         logInfo = self.read_logInfo(etlActivityLog)
>                         self.s3Handle.putObject(s3Client, 'etl_process_all.json', logInfo, bucketName, self.etlJobActivityLogFolder )
>                     # Write CSV chunk to the parquet file
>                     pq_writer.write_table(table)
>                     i += 1
>                 log.info( 'chunk count:', i, 'for a given [file:',targetKey,'whitelist:',targetDirectory[:-1|file:///',targetKey,'whitelist:',targetDirectory[:-1]])
>                 # Close a Parquet file writer
>                 if pq_writer is not None and pq_writer.is_open:
>                     pq_writer.close()
>                     pq_writer = None
>                 s3key = outputDirectory + targetDirectory + parquetFileName
>                 self.s3Handle.waitForFile(s3Client, bucketName, s3key)
>                 log.info('Metadata info:', table.column_names, 'number of columns:', table.num_columns, 'number of rows:', table.num_rows, 'Glue Object Name:', targetDirectory[:-1])
>                 log.debug('Wrote file', targetKey, 'with chunk count:', chunk_number)
>                 log.debug('Stream copy', targetKey, 'to parquet took:', datetime.now() - start_time)
>                 log.info('Final parquert convert:',sys.exc_info())
>         except (EOFError, IOError) as x:
>             log.error("error in source file for EOFError, IOError" %  x)
>             raise SystemExit('convert2Parquet EOFError:'+sys.exc_info())
>         except (ValueError, ParserError) as x:
>             log.error("error in source for ValueError, ParserError" %  x)
>             raise SystemExit('convert2Parquet valueError:'+sys.exc_info())
>  
> finally:
>             if pq_writer is not None and pq_writer.is_open:
>                 pq_writer.close()
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)