You are viewing a plain text version of this content. The canonical link for it is here.
Posted to c-dev@xerces.apache.org by Stefan Hummert <St...@SCHOBER.DE> on 2002/06/04 17:44:20 UTC

Parser pipe / fork problem (reading from stdin over pipe from forked processes)

Xerces C++ Bug Report:

Xerces-C++ version number: 	1.5.1
Platform:	 AIX
Operating system and version number: 	4.3.3 Maintenance Level 9
Compiler and version number: 	xlC 3.6.6.0
The XML document (or excerpt) that failed:	show description
The C++ application code that failed 	show description
Whether you built the Xerces-C++ library
    yourself or used the binary distribution:	both testet, same error

What happened:

First C++ Code: (short parser code)

SAX2DocHandler handler;
    handler.setControl(this, pPrnDM);
    parser->setContentHandler(&handler);
    parser->setErrorHandler(&handler);
  
    try
    {
        if (inFilename.isEmpty())
        {
            StdInInputSource src;
            parser->parse(src);
        }
        else
        {
            parser->parse(inFilename.c_str());
        }
    }
    catch (const XMLException& e)
    {
        cerr << "\nError during parsing: \n"
            << "Exception message is:  \n"
            << StrX(e.getMessage()) << "\n" << endl;
        return false;
    }
    catch (...)
    {
        cerr << "\nUnexpected exception during parsing: \n";
        return false;
    }

MAIN Programm code

Description:

I have written an application, that passes very much data arround.
I the first part makes XML data, the second does special
formating and the thrird part does special output.
Until now, i have done it so, that the first part writes
to a file, the second part reads the file and writes another file
and the third part reads the file created by the second part
and writes the end file.
That all was ok, but till the files goes very big (>2GB) that
takes long, and there was limitation that an interior part (extern source)
could not handle with files >2GB. (this is the apache Xerces c++ parser, perhaps it could handle with so big files, but only if you compile with 64 bit environment,
what we dont want to do)
So, I have thought of to do all steps in one, and that with pipes.
so all parts passes now their data with pipes to the next.
This all seems to went fine, until there goes much data over the pipe.
So for "little" examples" it seems to work, but for the
"great" ones, it doesn't.

In short, the following program makes such a thing:
I have one writer, that forkes all second processes, and the second processes forkes
the third if nessesary. All do open pipes before, and when all processes are made, then the writer (under the loop) begins to write, and the others process.
I have allready *cut" the code, so that it is better to seen over. (away cout and cerr messages and that things)
In the *easiest" case, there is one writer, one *lieferformproc" process between (second) and one output process (third).
In fact, the code works fine, for little Data (less than about 500 MB)

The problem is also the *memory": I looked for processmemory needs with topas into the list, and when the process works with files it takes about 10 to 20 MB, but if it goes with pipes as described here, the memory it needs increases by processing data, so that could be a problem why it stops. It takes as far as it comes up to 250 MB memory per process.

So the writer to the pipes are my processes, and the reader is always the Xerces parser.

Following error comes:
  Fatal Error at file stdin, line 19846220, char 16
  Message: An exception occured! Type:XMLPlatformException, Message:Could not read data
  from file

There is NO error in the XML
(I checked it, so i have all data that i write to the pipes also written to file)
It seems that the input from stdin stopped because of unknown?
So I really don't know if it is a *pipe" problem, or a Xerces parser problem.

Perhaps you have got an idea, and could answer (yes I know really a bit complex)

Here the main code. (with forkes and pipes)

fds = new int* [anzLF];

countChilds = 0;  // globale variable zurücksetzten / initialisiern
for (i=0; i<anzLF; i++)
{

        // vorbereiten des pipings und forkens des lfp (lieferformprozessors)
        fds[i] = new int [2];   // filedescriptors for pipe 

        if (pipe(fds[i]) != 0)  // pipe erstellen
        {
            errOccured++;
            break; 
        }

        pid_t lfpid = fork();   // prozess verdoppeln
        if (lfpid == -1)
        {
            errOccured++;
            break;  // return(_ERROR);
        }
        else if (lfpid != 0)    // parent process 
        {
            anzChilds++;        // child erfolgreich erstellt
            close(fds[i][0]);   // lese ende der pipe zumachen
            od_dataproc.pFiles[i] = fdopen(fds[i][1], "wb");
            if (od_dataproc.pFiles[i] == NULL)
            {
                errOccured++;
                break;
            }
        }
        else    // child process 
        {
            close(fds[i][1]);   // schreib ende der pipe zumachen
            if (dup2(fds[i][0],0) != 0)
            {
                _exit(1);
            }

            int errLFP = 0;         // merkt sich aufgetretene Fehler
            int lfp_fds [2];        // filedescriptors for pipe vom lieferformprozessor zum outputprocessor
            SOutputdirector  od_lfp;
            
            if ( ( pDelForm[i]->getFormatType() != POTSTAT ) && 
                 ( taskRef.getJobTyp() == jobTypeSelection  ) ) 
            {
                if (pipe(lfp_fds) != 0)  // pipe erstellen
                {
                    _exit(1);
                }

                pid_t opwid = fork();   // prozess verdoppeln
                if (opwid == -1)
                {
                     _exit(1);
                }
                else if (opwid != 0)    // parent process 
                {
                    close(lfp_fds[0]);   // lese ende der pipe zumachen
                    od_lfp.initAnz(1 + do_debug_out);
                    od_lfp.pFiles[0] = fdopen(lfp_fds[1], "wb");
                    if (do_debug_out != 0)
                    {
                        od_lfp.pFiles[1] = fopen(outputFileLFG.c_str() , "wb");
                        if ( od_lfp.pFiles[1] == NULL)
                        {
                           errLFP++;
                        }
                    }
                    if (od_lfp.pFiles[0] == NULL)
                    {
                        errLFP++;
                    }
                }
                else    // child process 
                {
                    close(lfp_fds[1]);   // schreib ende der pipe zumachen
                    if (dup2(lfp_fds[0],0) != 0)    
                    {
                         _exit(1);
                    }
                
                    SOutProcWorker      *pWorker = NULL;
                    opwParams           param;

                    pWorker = SOutProcWorker::create( pDelForm[i]->getFormatType() , param);
                    if (pWorker == NULL)
                    {
                        _exit(1);
                    }
       
                    if ( pWorker->startWork() )
                    {
                 cout << "MSG: Datei normal geparsed. (Output Processor) LF = " << i << endl;
                    }
                    else
                    {
                        delete pWorker;
                        _exit(1); //return(_ERROR);
                    } 
                    delete pWorker;
                    _exit(0);   // alles ok, ohne Fehler beenden
                }
                    
            }   // ENDE IF outputprocessor?
            else
            {
                 cout << "MSG: r" << endl;
            }
           
            bool retok = false;
            if (errLFP == 0)
            {   // nur wenn bisher keine Fehler aufgetaucht sind!
                retok = pDelFormProc->setDelform(i+1,*pDelForm[i], "", &od_lfp );
            }
            // bereinigung
            od_lfp.close();
            
            if(! retok)
            {
               errLFP++;
            }

            if ( ! waitforChild() )
            {
                errLFP++;
            }

            _exit(errLFP);
        }   // ende else CHILD LIEFERFORMPROZESSOR
        
}

    // Dataproc startet, wenn alle Kindprozesse erzeugt wurden, und auf
    // eingabe warten. 
    if ( do_debug_out != 0)
    { // DEBUG AUSGABE DES DATAPROCESSORS als DATEI
        od_dataproc.pFiles[anzLF] = fopen(outputFile.c_str(), "wb");
        if (od_dataproc.pFiles[anzLF] == NULL)
        {
           errOccured++;
        }
    }

    if (od_dataproc.check() != 0) // pOut == NULL)
    {
        errOccured++; 
    }

    // erste Aufbereitung => Daten
    pInFile = new SPrnFile(inputFile.c_str());
    
    if (errOccured == 0)
    {
        if (!doData(pInFile, &od_dataproc)) //pOut))
        {
           errOccured++;
        }
    }
    // Dateien schliessen, speicher freigeben
    delete pInFile;
    //fclose(pOut);
    od_dataproc.close();
    for (i = 0; i < anzLF; i++)
    {
        delete [] fds[i];
    }
    delete [] fds;

    while (countChilds < anzChilds)
    {
        if ( ! waitforChild() )
        {
           errOccured++;
        }
    }



Stefan Hummert (SIS/hu)

Schober Informationssysteme GmbH
Max-Eyth-Straße 6-10
D-71254 Ditzingen
Tel.:   07156-304-493
Fax:   07156-304-310
EMail:  stefan.hummert@schober.de

---------------------------------------------------------------------
To unsubscribe, e-mail: xerces-c-dev-unsubscribe@xml.apache.org
For additional commands, e-mail: xerces-c-dev-help@xml.apache.org