You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@cxf.apache.org by "avidd (JIRA)" <ji...@apache.org> on 2016/02/15 15:45:18 UTC

[jira] [Created] (CXF-6776) MTOM client receives bogus

avidd created CXF-6776:
--------------------------

             Summary: MTOM client receives bogus
                 Key: CXF-6776
                 URL: https://issues.apache.org/jira/browse/CXF-6776
             Project: CXF
          Issue Type: Bug
    Affects Versions: 3.1.5, 3.1.1
         Environment: Windows 7 Professional 64 bit, java 1.8.0_45
            Reporter: avidd


We have a SOAP web service that transmits large structured results sets (a list of rows each of which is a List<String>) using MTOM. We have now migrated to CXF due to some other bug in jaxws-spring. This (part of the) code seemed to work fine in jaxws-spring but we had strange problems there which we debugged to the lower layers as well. Now we have the following situation:

There are test cases that return result sets of several thousand rows each of which has several columns in it. The result set is "streamed" via MTOM. These tests fail sporadically every 10th or 20th time.
This is what I found out until now:
* This is reproducible on different machines.
* I first saw this with CXF 3.1.1. Then I upgraded to 3.1.5 and it still occurs. As said before there were similar bugs with jaxws-spring which showed different failures.
* The server cleanly writes all results into the output and flushes the output stream before closing it
* Both, client (a JUnit Test) and server (a Jetty server started by that test) are running on the same machine, actually in the same VM. 
* If I enable some of the logging in the below class (QueryResponse), this seems to reduce the probability of the error. I was not able to reproduce the error with the logging enabled. This is very frustrating and I think it hints at some race condition.

Our streaming result encodes rows like this:
* 4 bytes stating the number of columns in that row as an int (trailing nulls are cut off)
* for each column: 4 bytes stating the number of bytes in that column's value as an int
* the said number of bytes, will be converted to a string

*The test*
* If the test runs fine, the client receives 2077 containing 15 columns each. 
* Each column is a double or a string, it's basically the result of a simple SQL query. (Of course, if it actually were SQL we would use JDBC. It's actually a SAP data warehouse.)
* If the test fails, which it only does sporadically, it is always the 14th row failing.

Example of failure: 
When it is correctly received at the client side, then this is the bytes of row 14 of test 2. 
* first 4 bytes: we have 15 columns
* next 4 bytes: the first column has 2 bytes
* next 2 bytes: "BA"
{code}
[0, 0, 0, 15, 0, 0, 0, 2, 66, 65, 0, 0, 0, 1, 49, 0, 0, 0, 4, ...]
{code}

This is the erroneous bytes as received on the client side
* first 4 bytes: we have 15 columns
* next 4 bytes: the first columns has 2 bytes
* next 2 bytes: "BM"
{code}
[0, 0, 0, 15, 0, 0, 0, 2, 66, 77, 71, 0, 0, 0, ...]
{code}

At first glance, it looks like a byte was added (because of the three consecutive 0s). So I thought it may be an off-by-one error. But when looking at the 2's complement I see:
{code}
good: 65:     01000001
bad:  77, 71: 01001101 0100111
{code}
I can't see where a byte would have been inserted. It also doesn't look like only bits were flipped. It rather looks like total bogus to me.

Similar but different problems first occurred when we upgraded from Java 7 to Java 8 a year ago. Before, everything was fine. At that time we were using jaxws-spring and had the situation that sometimes the mime-boundary at the end of the attachment was missing. We hoped that an upgrade to CXF would fix the problem but now we even have issues with the data.
To me it looks like something is totally broken in the JRE but this only shows up when using MTOM, so it may be some integration problem. I wonder whether we are the only ones seeing this behavior.

This is my "streaming result set" class:

{code}
@XmlAccessorType ( XmlAccessType.NONE )
@XmlRootElement ( name = "streamResponse" )
@XmlType ( name = "streamResponseType" )
public class QueryResponse {
  private static final String MIME_TYPE_OCTET_STREAM = "application/octet-stream";
  private static final Logger LOG = LoggerFactory.getLogger(QueryResponse.class);
  private static final Marker FATAL = MarkerFactory.getMarker("FATAL");
  private static final ObjectPool<String> STRINGS = ObjectPool.stringPool();
  private static final int BUFFER_SIZE = 100;
  private static final int PREMATURE_END_OF_RESULT = -1;
  private static final byte[] PREMATURE_END_OF_RESULT_BYTES = 
      ByteBuffer.allocate(4).putInt(PREMATURE_END_OF_RESULT).array();
  private volatile int totalBytesRead = 0;
  private volatile int rowsRead = 0;
  private volatile int rowsWritten = 0;

  private DataHandler results;
  private BlockingQueue<List<String>> resultSet;
  private ResultSetIterator<String> resultSetIterator;
  private boolean receiving = true;

  /** Create a new response. This constructor is used by for unmarshalling the response. */
  QueryResponse() { }
  
  /**
   * Create a new result set encapsulating the given components.
   * @param aResults the result iterator
   */
  QueryResponse(AutoCloseableIterator<List<String>> aResults) {
    results = encode(aResults);
  }
  
  @XmlElement ( required = true )
  @XmlMimeType ( MIME_TYPE_OCTET_STREAM )
  DataHandler getResults() {
    return results;
  }
  
  /**
   * Set the result set, called by JAXB.
   * @param aDataHandler the data handler
   */
  void setResults(DataHandler aDataHandler) {
    if ( aDataHandler == null ) { throw new NullPointerException("aDataHandler"); }
    if ( resultSet != null ) { throw new IllegalStateException("Result set already exists."); }
    results = aDataHandler;

    // Pipelining
    /* parse results and fill queue while loading from the network */
    resultSet = new ArrayBlockingQueue<List<String>>(BUFFER_SIZE);
    resultSetIterator = new ResultSetIterator<String>(resultSet);
    DataHandler dataHandler = results;
    try {
      decode(dataHandler, resultSet, resultSetIterator);
    } catch ( InterruptedException e ) {
      Thread.currentThread().interrupt();
    }
  }
  
  /**
   * Called on the client side to get a streaming and blocking iterator.
   * @return the result set as a blocking iterator
   */
  public Iterator<List<String>> getResultSet() {
    return resultSetIterator; 
  }
  
  private int fill(byte[] bytes, InputStream in) throws IOException {
    int off = 0;
    int readCount = 0;
    do {
      // reads at least one byte if ( readCount > 0 ) if ( bytes.length - off ) > 0 and not EOF
      readCount = in.read(bytes, off, bytes.length - off);  
      off += readCount;
      if ( readCount > 0 ) {
        totalBytesRead += readCount;
      }
    } while ( readCount > 0 && off < bytes.length );
    if ( off > 0 && off < bytes.length ) { // end of stream is a correct termination
      try {
        readCount = in.read(bytes, off, bytes.length - off);
        String exception = readException(in);
        throw new RuntimeException(exception);
      } catch ( EOFException e ) {
        // There was no exception written by the server, just a premature end of the stream causing the client-side EOF.
        throw new RuntimeException("Premature end of stream, total bytes read: " + totalBytesRead); 
      }
    }
    return off;
  }

  private static void checkException(int len, InputStream in) throws ClassNotFoundException, IOException {
    if ( len == PREMATURE_END_OF_RESULT ) {
      String exception = readException(in);
      throw new RuntimeException(exception);
    }
  }

  private static String readException(InputStream in) throws IOException {
    ObjectInputStream objIn = new ObjectInputStream(in);
    try {
      Object object = objIn.readObject();
      if ( object != null ) {
        return object.toString();
      }
    } catch ( ClassNotFoundException e ) {
      throw new RuntimeException("Could not read exception.", e);
    }
    return "No exception received from service after premature end of result";
  }

  private DataHandler encode(AutoCloseableIterator<List<String>> aResults) {
    assert ( aResults != null );
    final PipedOutputStream out = new PipedOutputStream();
    DataHandler dh = new DataHandler(new StreamDataSource(out, MIME_TYPE_OCTET_STREAM));
    Encoder encoder = new Encoder(out, aResults, new ServerExceptionHandler());
    new Thread(encoder).start();
    return dh;
  }
  
  private void decode(
      DataHandler dataHandler, final BlockingQueue<List<String>> aResultSet, ExceptionHandler exceptionHandler) 
      throws InterruptedException {
    Decoder decoder = new Decoder(dataHandler, aResultSet, exceptionHandler);
    new Thread(decoder).start();
  }
  
  private void awaitIteratorBufferNotFull() throws InterruptedException {
    while ( resultSet.remainingCapacity() == 0 ) { resultSet.wait(); }
  }

  private void awaitElements() throws InterruptedException {
    while ( receiving && resultSet.isEmpty() ) { resultSet.wait(); }
  }

  private static final class StreamDataSource implements DataSource {
    private final String name = UUID.randomUUID().toString();
    private final InputStream in;
    private final String mimeType;

    private StreamDataSource(PipedOutputStream aOut, String aMimeType) {
      ArgumentChecks.checkNotNull(aOut, "aOut");
      ArgumentChecks.checkNotNull(aMimeType, "aMimeType");
      try {
        in = new PipedInputStream(aOut);
      } catch ( IOException e ) {
        throw new RuntimeException("Could not create input stream.", e);
      }
      mimeType = aMimeType;
    }

    @Override public String getName() { return name; }

    @Override public String getContentType() { return mimeType; }

    /**
     * {@inheritDoc}
     * 
     * This implementation violates the specification in that it is destructive. Only the first call will return an
     * appropriate input stream.
     */
    @Override public InputStream getInputStream() { return in; }

    @Override public OutputStream getOutputStream() { throw new UnsupportedOperationException(); }
  }
  
  /**
   * Decodes the contents of an input stream as written by the {@link com.tn_ag.sap.QueryResponse.Encoder} and writes
   * parsed rows to a {@link java.util.Queue}.
   */
  private class Decoder implements Runnable {
    private final DataHandler dataHandler;
    private final BlockingQueue<List<String>> resultSet;
    private final ExceptionHandler exceptionHandler;
    private final byte[] lenBytes = new byte[4];

    Decoder(DataHandler aDataHandler, BlockingQueue<List<String>> aResultSet, ExceptionHandler aHandler) {
      ArgumentChecks.checkNotNull(aDataHandler, "aDataHandler");
      ArgumentChecks.checkNotNull(aResultSet, "aResultSet");
      ArgumentChecks.checkNotNull(aHandler, "aHandler");
      dataHandler = aDataHandler;
      resultSet = aResultSet;
      exceptionHandler = aHandler;
    }
    
    @Override
    public void run() {
      InputStream in = null;
      List<String> row;
      int len;
      try { 
        in = dataHandler.getInputStream();
        
        while ( receiving ) {
          synchronized ( resultSet ) {
            receiving = fill(lenBytes, in) > 0;       // read next row's length in number of columns 
            len = ByteBuffer.wrap(lenBytes).getInt(); // convert row length to integer
            if ( !receiving || len == 0 ) { break; }
            checkException(len, in);                  // -1 signals an exception

            row = readRow(in, len);
            awaitIteratorBufferNotFull();
            resultSet.put(row);
            rowsRead++;
            if ( rowsRead % 1000 == 0 ) {
              LOG.debug("already received {} rows", rowsRead);
            }
            resultSet.notifyAll();                    // notify waiting consumer threads
          }
        }
        stopReception();
        LOG.debug("received a total of {} rows.", rowsRead);
      } catch ( InterruptedException e ) {
        LOG.info("Result reception interrupted.");
      } catch ( Exception e ) {
        exceptionHandler.handle(e);
        stopReception();
      } finally {
        receiving = false;
        try {
          if ( in != null ) { in.close(); }
        } catch ( IOException e ) {
          exceptionHandler.handle(e);
        }
      }
    }

    private List<String> readRow(InputStream in, int len) throws IOException {
      List<String> row = new ArrayList<>(len);           // create list of appropriate length
      for ( int col = 0; col < len; col++ ) {            // for each column              
        fill(lenBytes, in);                              // read the value length (fixed for some types if schema known)
        int valLen = ByteBuffer.wrap(lenBytes).getInt(); // convert the length of the value as bytes to an int
        final byte[] bytes = new byte[valLen];           // allocate a buffer of exactly the required size
        fill(bytes, in);              
        row.add(STRINGS.internalize(new String(bytes)));
      }
      return row;
    }

    private void stopReception() {
      synchronized ( resultSet ) {
        receiving = false;           // we will stop parsing now
        resultSet.notifyAll();       // consumers can now consume the remaining elements from the queue
        LOG.debug("Read " + rowsRead + " rows from binary stream");
      }                
    }
  }
  
  /**
   * Encodes the given result set and writes the result to an output stream. 
   */
  private class Encoder implements Runnable {
    private final OutputStream out;
    private final AutoCloseableIterator<List<String>> iterator;
    private final ExceptionHandler handler;

    Encoder(OutputStream aOut, AutoCloseableIterator<List<String>> aResults, ExceptionHandler aHandler) {
      ArgumentChecks.checkNotNull(aOut, "aOut");
      ArgumentChecks.checkNotNull(aResults, "aResults");
      ArgumentChecks.checkNotNull(aHandler, "aHandler");
      out = aOut;
      iterator = aResults;
      handler = aHandler;
    }

    @Override
    public void run() {
      try ( AutoCloseableIterator<List<String>> iter = this.iterator; OutputStream out = this.out ) {
        writeResultSet(iter, out);
        out.flush();
      } catch ( Exception e ) {
        handler.handle(e);
      }
    }

    private void writeResultSet(AutoCloseableIterator<List<String>> iter, OutputStream out2) throws IOException {
      List<String> row = null;
      while ( this.iterator.hasNext() ) {
        try {
          row = this.iterator.next();
          writeRow(row);
          rowsWritten++;
        } catch ( Exception e ) {
          out.write(PREMATURE_END_OF_RESULT_BYTES); // write -1, signaling an exception instead of row size
          writeException(out, e);                   // send exception to client for rethrowing it there
          throw e;                                  // stop transmission, handle exception on server-side (logging)
        }
      }
      LOG.info("wrote {} rows to binary stream, closing output stream", rowsWritten);
    }

    private void writeRow(List<String> row) throws IOException {
      out.write(ByteBuffer.allocate(4).putInt(row.size()).array());     // write row size (in number of columns)
      for ( String s : row ) {
        if ( s == null ) { s = ""; }
        byte[] bytes = s.getBytes();
        out.write(ByteBuffer.allocate(4).putInt(bytes.length).array()); // write size of column in bytes
        out.write(bytes);                                               // write column value
      }
    }

    private void writeException(OutputStream output, Exception e) throws IOException {
      ObjectOutputStream objOut = new ObjectOutputStream(output);
      objOut.writeObject(toString(e));
    }

    private String toString(Throwable e) {
      if ( e instanceof UncheckedConnectionException && e.getCause() != null ) {
        e = e.getCause();
      }
      ByteArrayOutputStream stackTraceOut = new ByteArrayOutputStream();
      PrintWriter writer = new PrintWriter(stackTraceOut);
      e.printStackTrace(writer);
      writer.flush();
      return new String(stackTraceOut.toByteArray());
    }
  }
  
  private final class ResultSetIterator<T> extends ExceptionHandler implements Iterator<List<T>> {
    private final BlockingQueue<List<T>> queue;
    private Exception exception;
    private int returned = 0;

    private ResultSetIterator(BlockingQueue<List<T>> aQueue) {
      queue = aQueue;
    }

    /**
     * {@inheritDoc}
     * 
     * This implementation marks the current thread as interrupted if streaming could not be commenced. 
     */
    @Override
    public boolean hasNext() {
      if ( exception != null ) { throw new UncheckedConnectionException("Exception while reading results", exception); }
      try {
        synchronized ( resultSet ) {
          awaitElements();
          if ( exception != null ) { 
            throw new UncheckedConnectionException("Exception while reading results", exception); 
          }
          if ( resultSet.isEmpty() ) {
            LOG.debug("iterator returned " + returned + " rows");
          }
          return !resultSet.isEmpty();
        }
      } catch ( InterruptedException e ) {
        Thread.currentThread().interrupt();
        return false;
      }
    
    }

    /**
     * {@inheritDoc}
     * 
     * If the current thread is interrupted during this call, it is marked as interrupted and the method returns 
     * {@code null}.
     * 
     * @throws NoSuchElementException if called while {@link #hasNext()} returns {@code false}
     */
    @Override
    public List<T> next() throws NoSuchElementException {
      if ( exception != null ) { throw new IllegalStateException("Exception while reading results", exception); }
      if ( !hasNext() ) { throw new NoSuchElementException(); }
      try {
        List<T> result = queue.take();
        synchronized ( resultSet ) { resultSet.notify(); }
        returned++;
        return result;
      } catch ( InterruptedException e ) {
        Thread.currentThread().interrupt();
        return null;
      }
    }

    /**
     * This method is not supported.
     */
    @Override public void remove() { throw new UnsupportedOperationException("RTFM"); }

    @Override void handle(Exception aException) {
      if ( exception != null ) { 
        LOG.warn("There was another exception.", aException);
      }
      exception = aException;
    }
  }
  
  private abstract static class ExceptionHandler {
    abstract void handle(Exception exception);
  }
  
  private static class ServerExceptionHandler extends ExceptionHandler {
    @Override
    void handle(Exception aException) {
      LOG.error(FATAL, "Exception while writing response.", aException);
    }
  }
}
{code}





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)