You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@arrow.apache.org by "Mahesha Subrahamanya (Jira)" <ji...@apache.org> on 2022/06/13 06:47:00 UTC
[jira] [Commented] (ARROW-16822) Python Error: <>, exitCode: <139> when csv file converting parquet using pandas/pyarrow libraries
[ https://issues.apache.org/jira/browse/ARROW-16822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17553405#comment-17553405 ]
Mahesha Subrahamanya commented on ARROW-16822:
----------------------------------------------
following versions have used to reproduce the issue so please suggest incase to upgrade to latest version of libraries have addressed this issue.
{{{}pandas{}}}{{{}==1.3.3; python_full_{}}}{{{}version{}}}{{ >= "3.7.1" \}}
{{{}pyarrow==5.0.0; python_full_{}}}{{{}version{}}}{{{} >= "3.6.2" and python_{}}}{{{}version{}}}{{{} < "3.10" and python_{}}}{{{}version{}}}{{ >= "3.6"}}
> Python Error: <>, exitCode: <139> when csv file converting parquet using pandas/pyarrow libraries
> -------------------------------------------------------------------------------------------------
>
> Key: ARROW-16822
> URL: https://issues.apache.org/jira/browse/ARROW-16822
> Project: Apache Arrow
> Issue Type: Bug
> Components: Parquet, Python
> Affects Versions: 5.0.0
> Reporter: Mahesha Subrahamanya
> Priority: Blocker
>
> Our main requirement is to read source file (structured/semi structured /unstructured) which are residing in AWS s3 through AWS redshift database, where our customer have direct access to analyze the data very quickly/seamlessly for reporting purpose without defining the schema info for the file.
> We have created an data lake (aws s3) workspace where our customers dumps csv/parquet huge size files (like 10/15 GB). We have developed a framework which is consuming pandas/pyarrow (parquet) libraries to read source files in chunking manner and identifying schema meaning (datatype/length) and push it to AWS Glue where AWS redshift database can talk seamlessly to s3 files can read very quickly.
>
> Following is the snippet of parquet conversion where i'm getting this error. Please take a look
>
> read_csv_args = \{'filepath_or_buffer': src_object, 'chunksize': self.chunkSizeLimit, 'encoding': 'UTF-8','on_bad_lines': 'error','sep': fileDelimiter, 'low_memory': False, 'skip_blank_lines': True, 'memory_map': True} # 'verbose': True , In order to enable memory consumption logging
>
> if srcPath.endswith('.gz'):
> read_csv_args['compression'] = 'gzip'
> if fileTextQualifier:
> read_csv_args['quotechar'] = fileTextQualifier
> with pd.read_csv(**read_csv_args) as reader:
> for chunk_number, chunk in enumerate(reader, 1):
> # To support shape-shifting for the incoming datafiles, need to make sure match file with number of columns if not delete
> if glueMasterSchema is not None:
> sessionSchema=copy.deepcopy(glueMasterSchema) #copying using deepcopy() method
> chunk.columns = chunk.columns.str.lower() # modifying the column header of all columns to lowercase
> fileSchema = list(chunk.columns)
> for key in list(sessionSchema):
> if key not in fileSchema:
> del sessionSchema[key]
> fields = []
> for col,dtypes in sessionSchema.items():
> fields.append(pa.field(col, dtypes))
> glue_schema = pa.schema(fields)
> # To identify the boolean datatype and convert back to STRING which was done during the BF schema
> for cols in chunk.columns:
> try:
> if chunk[cols].dtype =='bool':
> chunk[cols] = chunk[cols].astype('str')
> if chunk[cols].dtype =='object':
> chunk[cols] = chunk[cols].fillna('').astype('str').tolist()
> except (ParserError,ValueError,TypeError):
> pass
> log.debug("chunk count", chunk_number, "chunk length", len(chunk), 'glue_schema', glue_schema, 'Wrote file', targetKey)
> #log.debug("during pandas chunk data ", chunk,"df schemas:", chunk.dtypes)
> table = pa.Table.from_pandas(chunk, schema=glue_schema , preserve_index=False)
> log.info('Glue schema:',glue_schema,'for a [file:',targetKey|file:///',targetKey])
> log.info('pandas memory utilization during chunk process: ', chunk.memory_usage().sum(), 'Bytes.','\n\n\n')
> # Guess the schema of the CSV file from the first chunk
> #if pq_writer is None:
> if chunk_number == 1:
> #parquet_schema = table.schema
> # Open a Parquet file for writing
> pq_writer = pq.ParquetWriter(targetKey, schema=glue_schema, compression='snappy') # In PyArrow we use, Snappy generally results in better performance
> log.debug("table schema :", pprint.pformat(table.schema).replace('\n', ',').replace('\r', ','),' for:', inputFileName)
> # writing the log information into s3://etl_activity
> etlActivityLog.append({'tableObjectName': targetDirectory[:-1], 'sourceFileName': inputFileName, 'targetFileName': parquetFileName, 'message': 'File Converted Successfully', 'number of rows processed': str(table.num_rows), 'fileStatus': 'SUCCESS'})
> logInfo = self.read_logInfo(etlActivityLog)
> self.s3Handle.putObject(s3Client, 'etl_process_all.json', logInfo, bucketName, self.etlJobActivityLogFolder )
> # Write CSV chunk to the parquet file
> pq_writer.write_table(table)
> i += 1
> log.info( 'chunk count:', i, 'for a given [file:',targetKey,'whitelist:',targetDirectory[:-1|file:///',targetKey,'whitelist:',targetDirectory[:-1]])
> # Close a Parquet file writer
> if pq_writer is not None and pq_writer.is_open:
> pq_writer.close()
> pq_writer = None
> s3key = outputDirectory + targetDirectory + parquetFileName
> self.s3Handle.waitForFile(s3Client, bucketName, s3key)
> log.info('Metadata info:', table.column_names, 'number of columns:', table.num_columns, 'number of rows:', table.num_rows, 'Glue Object Name:', targetDirectory[:-1])
> log.debug('Wrote file', targetKey, 'with chunk count:', chunk_number)
> log.debug('Stream copy', targetKey, 'to parquet took:', datetime.now() - start_time)
> log.info('Final parquert convert:',sys.exc_info())
> except (EOFError, IOError) as x:
> log.error("error in source file for EOFError, IOError" % x)
> raise SystemExit('convert2Parquet EOFError:'+sys.exc_info())
> except (ValueError, ParserError) as x:
> log.error("error in source for ValueError, ParserError" % x)
> raise SystemExit('convert2Parquet valueError:'+sys.exc_info())
>
> finally:
> if pq_writer is not None and pq_writer.is_open:
> pq_writer.close()
>
--
This message was sent by Atlassian Jira
(v8.20.7#820007)