You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@arrow.apache.org by "Mahesha Subrahamanya (Jira)" <ji...@apache.org> on 2022/06/13 06:44:00 UTC

[jira] [Created] (ARROW-16822) Python Error: <>, exitCode: <139> when csv file converting parquet using pandas/pyarrow libraries

Mahesha Subrahamanya created ARROW-16822:
--------------------------------------------

             Summary: Python Error: <>, exitCode: <139> when csv file converting parquet using pandas/pyarrow libraries
                 Key: ARROW-16822
                 URL: https://issues.apache.org/jira/browse/ARROW-16822
             Project: Apache Arrow
          Issue Type: Bug
          Components: Parquet, Python
    Affects Versions: 5.0.0
            Reporter: Mahesha Subrahamanya


Our main requirement is to read source file (structured/semi structured /unstructured) which are residing in AWS s3 through AWS redshift database, where our customer have direct access to analyze the data very quickly/seamlessly for reporting purpose without defining the schema info for the file.

We have created an datalake (aws s3) workspace where our customers dumps csv/parquet huge size files (like 10/15 GB). We have developed a framework which is consuming pandas/pyarrow (parquet) libraires to read source files and identifying schema meaning (datatype/length) and push it to AWS Glue where AWS redshift database can talk seamlessly to s3 files can read very quickly.

 

Following is the snippet of parquet conversion where i'm getting this error. Please take a look

 

read_csv_args = \{'filepath_or_buffer': src_object, 'chunksize': self.chunkSizeLimit, 'encoding': 'UTF-8','on_bad_lines': 'error','sep': fileDelimiter, 'low_memory': False, 'skip_blank_lines': True, 'memory_map': True} # 'verbose': True , In order to enable memory consumption logging
            

if srcPath.endswith('.gz'):
                read_csv_args['compression'] = 'gzip'
            if fileTextQualifier:
                read_csv_args['quotechar'] = fileTextQualifier

with pd.read_csv(**read_csv_args) as reader:
                for chunk_number, chunk in enumerate(reader, 1):

                    # To support shape-shifting for the incoming datafiles, need to make sure match file with number of columns if not delete
                    if glueMasterSchema is not None:
                        sessionSchema=copy.deepcopy(glueMasterSchema) #copying using deepcopy() method
                        chunk.columns = chunk.columns.str.lower() # modifying the column header of all columns to lowercase
                        fileSchema = list(chunk.columns)
                        for key in list(sessionSchema):
                            if key not in fileSchema:
                                del sessionSchema[key]

                        fields = []
                        for col,dtypes in sessionSchema.items():
                            fields.append(pa.field(col, dtypes))
                        glue_schema = pa.schema(fields)

                        # To identify the boolean datatype and convert back to STRING which was done during the BF schema
                        for cols in chunk.columns:
                            try:
                                if chunk[cols].dtype =='bool':
                                    chunk[cols] = chunk[cols].astype('str')
                                if chunk[cols].dtype =='object':
                                    chunk[cols] = chunk[cols].fillna('').astype('str').tolist()
                            except (ParserError,ValueError,TypeError):
                                pass

                    log.debug("chunk count", chunk_number, "chunk length", len(chunk), 'glue_schema', glue_schema, 'Wrote file', targetKey)
                    #log.debug("during pandas chunk data ", chunk,"df schemas:", chunk.dtypes)
                    table = pa.Table.from_pandas(chunk,  schema=glue_schema , preserve_index=False)
                    log.info('Glue schema:',glue_schema,'for a file:',targetKey)
                    log.info('pandas memory utilization during chunk process: ', chunk.memory_usage().sum(), 'Bytes.','\n\n\n')
                    # Guess the schema of the CSV file from the first chunk
                    #if pq_writer is None:
                    if chunk_number == 1:
                        #parquet_schema = table.schema
                        # Open a Parquet file for writing
                        pq_writer = pq.ParquetWriter(targetKey, schema=glue_schema, compression='snappy') # In PyArrow we use, Snappy generally results in better performance
                        log.debug("table schema :", pprint.pformat(table.schema).replace('\n', ',').replace('\r', ','),' for:', inputFileName)

                        # writing the log information into s3://etl_activity
                        etlActivityLog.append(\{'tableObjectName': targetDirectory[:-1], 'sourceFileName': inputFileName, 'targetFileName': parquetFileName, 'message': 'File Converted Successfully', 'number of rows processed': str(table.num_rows), 'fileStatus': 'SUCCESS'})
                        logInfo = self.read_logInfo(etlActivityLog)
                        self.s3Handle.putObject(s3Client, 'etl_process_all.json', logInfo, bucketName, self.etlJobActivityLogFolder )

                    # Write CSV chunk to the parquet file
                    pq_writer.write_table(table)
                    i += 1

                log.info( 'chunk count:', i, 'for a given file:',targetKey,'whitelist:',targetDirectory[:-1])
                # Close a Parquet file writer
                if pq_writer is not None and pq_writer.is_open:
                    pq_writer.close()
                    pq_writer = None

                s3key = outputDirectory + targetDirectory + parquetFileName
                self.s3Handle.waitForFile(s3Client, bucketName, s3key)

                log.info('Metadata info:', table.column_names, 'number of columns:', table.num_columns, 'number of rows:', table.num_rows, 'Glue Object Name:', targetDirectory[:-1])
                log.debug('Wrote file', targetKey, 'with chunk count:', chunk_number)
                log.debug('Stream copy', targetKey, 'to parquet took:', datetime.now() - start_time)
                log.info('Final parquert convert:',sys.exc_info())

        except (EOFError, IOError) as x:
            log.error("error in source file for EOFError, IOError" %  x)
            raise SystemExit('convert2Parquet EOFError:'+sys.exc_info())
        except (ValueError, ParserError) as x:
            log.error("error in source for ValueError, ParserError" %  x)
            raise SystemExit('convert2Parquet valueError:'+sys.exc_info())

 

finally:
            if pq_writer is not None and pq_writer.is_open:
                pq_writer.close()

 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)