You are viewing a plain text version of this content. The canonical link for it is here.
Posted to derby-commits@db.apache.org by kr...@apache.org on 2010/04/22 18:45:22 UTC

svn commit: r936950 - in /db/derby/code/trunk/java: client/org/apache/derby/client/net/ drda/org/apache/derby/impl/drda/ engine/org/apache/derby/iapi/reference/ engine/org/apache/derby/loc/ shared/org/apache/derby/shared/common/reference/

Author: kristwaa
Date: Thu Apr 22 16:45:22 2010
New Revision: 936950

URL: http://svn.apache.org/viewvc?rev=936950&view=rev
Log:
DERBY-2017: Client driver can insert and commit partial data when a LOB stream throws IOException or does not match the specified length 

Made the client tell the server (if supported) if the EXTDTA transfer was
successful or not, by appending a Derby-specific status byte to the user data.
If the transfer was unsuccessful, an exception will be thrown on the server
side even if the data was padded by the client to avoid a protocol exception.
There are three types of errors that can happen on the client when reading
the user stream:
  a) the stream is shorter than specified
  b) the stream is longer than specified
  c) an IOException is thrown when reading the stream

The fix consists of logic to allow the client and the server to determine if
the status byte will be sent, code to append the status byte on the client, and
code on the server to parse / read the status byte.
Some code in DRDAConnThread was also refactored (mostly readAndSetExtParam).

Patch file: derby-2017-3c-fix.diff


Added:
    db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/FailingEXTDTAInputStream.java   (with props)
Modified:
    db/derby/code/trunk/java/client/org/apache/derby/client/net/NetConnection.java
    db/derby/code/trunk/java/client/org/apache/derby/client/net/NetDatabaseMetaData.java
    db/derby/code/trunk/java/client/org/apache/derby/client/net/Request.java
    db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/AppRequester.java
    db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/DDMReader.java
    db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/DRDAConnThread.java
    db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/EXTDTAReaderInputStream.java
    db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/LayerBStreamedEXTDTAReaderInputStream.java
    db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/StandardEXTDTAReaderInputStream.java
    db/derby/code/trunk/java/engine/org/apache/derby/iapi/reference/DRDAConstants.java
    db/derby/code/trunk/java/engine/org/apache/derby/loc/messages.xml
    db/derby/code/trunk/java/shared/org/apache/derby/shared/common/reference/MessageId.java

Modified: db/derby/code/trunk/java/client/org/apache/derby/client/net/NetConnection.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/client/org/apache/derby/client/net/NetConnection.java?rev=936950&r1=936949&r2=936950&view=diff
==============================================================================
--- db/derby/code/trunk/java/client/org/apache/derby/client/net/NetConnection.java (original)
+++ db/derby/code/trunk/java/client/org/apache/derby/client/net/NetConnection.java Thu Apr 22 16:45:22 2010
@@ -1753,6 +1753,13 @@ public class NetConnection extends org.a
         return metadata.serverSupportsUDTs();
     }
 
+    protected final boolean serverSupportsEXTDTAAbort() {
+        NetDatabaseMetaData metadata =
+            (NetDatabaseMetaData) databaseMetaData_;
+
+        return metadata.serverSupportsEXTDTAAbort();
+    }
+
     /**
      * Checks whether the server supports locators for large objects.
      *

Modified: db/derby/code/trunk/java/client/org/apache/derby/client/net/NetDatabaseMetaData.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/client/org/apache/derby/client/net/NetDatabaseMetaData.java?rev=936950&r1=936949&r2=936950&view=diff
==============================================================================
--- db/derby/code/trunk/java/client/org/apache/derby/client/net/NetDatabaseMetaData.java (original)
+++ db/derby/code/trunk/java/client/org/apache/derby/client/net/NetDatabaseMetaData.java Thu Apr 22 16:45:22 2010
@@ -38,6 +38,14 @@ public class NetDatabaseMetaData extends
 
     /** True if the server supports UDTs */
     private boolean supportsUDTs_;
+
+    /**
+     * True if the server supports aborting a statement whilst transferring
+     * EXTDTA objects. Note that there are two types of aborts, depending on
+     * whether an object is being transferred to the server using DDM layer B
+     * streaming or not.
+     */
+    private boolean supportsEXTDTAAbort_;
     
     /** True if the server supports nanoseconds in timestamps */
     private boolean supportsTimestampNanoseconds_;
@@ -106,6 +114,9 @@ public class NetDatabaseMetaData extends
 
         supportsTimestampNanoseconds_ =
                 productLevel_.greaterThanOrEqualTo(10, 6, 0);
+
+        supportsEXTDTAAbort_ =
+                productLevel_.greaterThanOrEqualTo(10, 6, 0);
     }
 
     /**
@@ -145,4 +156,12 @@ public class NetDatabaseMetaData extends
     final boolean serverSupportsTimestampNanoseconds() {
         return supportsTimestampNanoseconds_;
     }
+
+    /**
+     * Check if server supports product specific EXTDTA abort protocol.
+     * @return {@code true} if the server supports this.
+     */
+    final boolean serverSupportsEXTDTAAbort() {
+        return supportsEXTDTAAbort_;
+    }
 }

Modified: db/derby/code/trunk/java/client/org/apache/derby/client/net/Request.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/client/org/apache/derby/client/net/Request.java?rev=936950&r1=936949&r2=936950&view=diff
==============================================================================
--- db/derby/code/trunk/java/client/org/apache/derby/client/net/Request.java (original)
+++ db/derby/code/trunk/java/client/org/apache/derby/client/net/Request.java Thu Apr 22 16:45:22 2010
@@ -278,6 +278,10 @@ public class Request {
     
     // We need to reuse the agent's sql exception accumulation mechanism
     // for this write exception, pad if the length is too big, and truncation if the length is too small
+    // WARNING: The code encrypting EXTDTA still has the problems described by
+    //          DERBY-2017. The server doesn't support this security mechanism
+    //          (see for instance DERBY-1345), and it is not clear whether this
+    //          piece of code is ever used.
     final private void writeEncryptedScalarStream(boolean chained,
                                                   boolean chainedWithSameCorrelator,
                                                   int codePoint,
@@ -285,9 +289,6 @@ public class Request {
                                                   java.io.InputStream in,
                                                   boolean writeNullByte,
                                                   int parameterIndex) throws DisconnectException, SqlException {
-        
-
-			
 		int leftToRead = length;
 		int extendedLengthByteCount = prepScalarStream(chained,
 													   chainedWithSameCorrelator,
@@ -321,7 +322,8 @@ public class Request {
 				bytesRead = in.read(clearedBytes, pos, leftToRead);
 				totalBytesRead += bytesRead;
 			} catch (java.io.IOException e) {
-				padScalarStreamForError(leftToRead, bytesToRead);
+                padScalarStreamForError(leftToRead, bytesToRead,
+                        false, (byte)-1);
 				// set with SQLSTATE 01004: The value of a string was truncated when assigned to a host variable.
 				netAgent_.accumulateReadException(new SqlException(netAgent_.logWriter_,
 																   new ClientMessageId(SQLState.NET_IOEXCEPTION_ON_READ),
@@ -440,8 +442,24 @@ public class Request {
     }
 	
 	
-	// We need to reuse the agent's sql exception accumulation mechanism
-    // for this write exception, pad if the length is too big, and truncation if the length is too small
+    /**
+     * Writes a stream with a known length onto the wire.
+     * <p>
+     * To avoid DRDA protocol exceptions, the data is truncated or padded as
+     * required to complete the transfer. This can be avoided by implementing
+     * the request abort mechanism specified by DRDA, but it is rather complex
+     * and may not be worth the trouble.
+     * <p>
+     * Also note that any exceptions generated while writing the stream will
+     * be accumulated and raised at a later time.
+     *
+     * @param length the byte length of the stream
+     * @param in the stream to transfer
+     * @param writeNullByte whether or not to write a NULL indicator
+     * @param parameterIndex one-based parameter index
+     * @throws DisconnectException if a severe error condition is encountered,
+     *      causing the connection to be broken
+     */
 	final private void writePlainScalarStream(boolean chained,
                                               boolean chainedWithSameCorrelator,
                                               int codePoint,
@@ -449,33 +467,49 @@ public class Request {
                                               java.io.InputStream in,
                                               boolean writeNullByte,
                                               int parameterIndex) throws DisconnectException, SqlException {
+        // We don't have the metadata available when we create this request
+        // object, so we have to check here if we are going to write the status
+        // byte or not.
+        final boolean writeEXTDTAStatusByte =
+                netAgent_.netConnection_.serverSupportsEXTDTAAbort();
+
+        // If the Derby specific status byte is sent, the number of bytes to
+        // send differs from the number of bytes to read (off by one byte).
 		int leftToRead = length;
+        int bytesToSend = writeEXTDTAStatusByte ? leftToRead + 1 : leftToRead;
 		int extendedLengthByteCount = prepScalarStream(chained,
 													   chainedWithSameCorrelator,
 													   writeNullByte,
-													   leftToRead);
+                                                       bytesToSend);
 		int bytesToRead;
 				
 		if (writeNullByte) {
-			bytesToRead = Math.min(leftToRead, DssConstants.MAX_DSS_LEN - 6 - 4 - 1 - extendedLengthByteCount);
+            bytesToRead = Math.min(bytesToSend, DssConstants.MAX_DSS_LEN - 6 - 4 - 1 - extendedLengthByteCount);
 		} else {
-			bytesToRead = Math.min(leftToRead, DssConstants.MAX_DSS_LEN - 6 - 4 - extendedLengthByteCount);
+            bytesToRead = Math.min(bytesToSend, DssConstants.MAX_DSS_LEN - 6 - 4 - extendedLengthByteCount);
 		}
-				
+
+        // If we are sending the status byte and we can send the user value as
+        // one DSS, correct for the status byte (otherwise we read one byte too
+        // much from the stream).
+        if (writeEXTDTAStatusByte && bytesToRead == bytesToSend) {
+            bytesToRead--;
+        }
+
 		buildLengthAndCodePointForLob(codePoint,
-									  leftToRead,
+                                      bytesToSend,
 									  writeNullByte,
 									  extendedLengthByteCount);
-
+        byte status = DRDAConstants.STREAM_OK;
 		int bytesRead = 0;
-		int totalBytesRead = 0;
 		do {
 			do {
 				try {
 					bytesRead = in.read(bytes_, offset_, bytesToRead);
-					totalBytesRead += bytesRead;
 				} catch (java.io.IOException e) {
-					padScalarStreamForError(leftToRead, bytesToRead);
+                    status = DRDAConstants.STREAM_READ_ERROR;
+                    padScalarStreamForError(leftToRead, bytesToRead,
+                            writeEXTDTAStatusByte, status);
 					// set with SQLSTATE 01004: The value of a string was truncated when assigned to a host variable.
 					netAgent_.accumulateReadException(new SqlException(
 																	   netAgent_.logWriter_,
@@ -487,7 +521,9 @@ public class Request {
 					return;
 				}
 				if (bytesRead == -1) {
-					padScalarStreamForError(leftToRead, bytesToRead);
+                    status = DRDAConstants.STREAM_TOO_SHORT;
+                    padScalarStreamForError(leftToRead, bytesToRead,
+                            writeEXTDTAStatusByte, status);
 					// set with SQLSTATE 01004: The value of a string was truncated when assigned to a host variable.
 					netAgent_.accumulateReadException(new SqlException(netAgent_.logWriter_,
 																	   new ClientMessageId(SQLState.NET_PREMATURE_EOS),
@@ -506,12 +542,14 @@ public class Request {
 		// check to make sure that the specified length wasn't too small
 		try {
 			if (in.read() != -1) {
+                status = DRDAConstants.STREAM_TOO_LONG;
 				// set with SQLSTATE 01004: The value of a string was truncated when assigned to a host variable.
 				netAgent_.accumulateReadException(new SqlException(netAgent_.logWriter_,
 																   new ClientMessageId(SQLState.NET_INPUTSTREAM_LENGTH_TOO_SMALL),
 																   new Integer(parameterIndex)));
 			}
 		} catch (java.io.IOException e) {
+            status = DRDAConstants.STREAM_READ_ERROR;
 			netAgent_.accumulateReadException(new SqlException(
 															   netAgent_.logWriter_,
 															   new ClientMessageId(
@@ -520,18 +558,50 @@ public class Request {
 															   e.getMessage(),
 															   e));
 		}
+        // Write the status byte to the send buffer.
+        if (writeEXTDTAStatusByte) {
+            writeEXTDTAStatus(status);
+        }
 	}
 
 
-    // We need to reuse the agent's sql exception accumulation mechanism
-    // for this write exception, pad if the length is too big, and truncation if the length is too small
+
+    /**
+     * Writes a stream with unknown length onto the wire.
+     * <p>
+     * To avoid DRDA protocol exceptions, the data is truncated or padded as
+     * required to complete the transfer. This can be avoided by implementing
+     * the request abort mechanism specified by DRDA, but it is rather complex
+     * and may not be worth the trouble.
+     * <p>
+     * Also note that any exceptions generated while writing the stream will
+     * be accumulated and raised at a later time.
+     * <p>
+     * <em>Implementation note:</em> This method does not support sending
+     * values with a specified length using layer B streaming and at the same
+     * time applying length checking. For large values layer B streaming may be
+     * more efficient than using layer A streaming.
+     *
+     * @param in the stream to transfer
+     * @param writeNullByte whether or not to write a NULL indicator
+     * @param parameterIndex one-based parameter index
+     * @throws DisconnectException if a severe error condition is encountered,
+     *      causing the connection to be broken
+     */
 	final private void writePlainScalarStream(boolean chained,
                                               boolean chainedWithSameCorrelator,
                                               int codePoint,
                                               java.io.InputStream in,
                                               boolean writeNullByte,
-                                              int parameterIndex) throws DisconnectException, SqlException {
+                                              int parameterIndex)
+            throws DisconnectException {
 		
+        // We don't have the metadata available when we create this request
+        // object, so we have to check here if we are going to write the status
+        // byte or not.
+        final boolean writeEXTDTAStatusByte =
+                netAgent_.netConnection_.serverSupportsEXTDTAAbort();
+
         in = new BufferedInputStream( in );
 
         flushExistingDSS();
@@ -582,10 +652,10 @@ public class Request {
                 }
                 
             }
-            
-            
         } catch (java.io.IOException e) {
-            
+            if (writeEXTDTAStatusByte) {
+                writeEXTDTAStatus(DRDAConstants.STREAM_READ_ERROR);
+            }
             final SqlException sqlex = 
                 new SqlException(netAgent_.logWriter_,
                                  new ClientMessageId(SQLState.NET_IOEXCEPTION_ON_READ),
@@ -597,30 +667,10 @@ public class Request {
             
 					return;
         }
-        
-        
-        
-		// check to make sure that the specified length wasn't too small
-		try {
-			if (in.read() != -1) {
-				// set with SQLSTATE 01004: The value of a string was truncated when assigned to a host variable.
 
-                final SqlException sqlex = 
-                    new SqlException(netAgent_.logWriter_,
-                                     new ClientMessageId(SQLState.NET_INPUTSTREAM_LENGTH_TOO_SMALL),
-                                     new Integer(parameterIndex));
-
-				netAgent_.accumulateReadException(sqlex);
-			}
-		} catch (java.io.IOException e) {
-			netAgent_.accumulateReadException(new SqlException(
-															   netAgent_.logWriter_,
-															   new ClientMessageId(
-																				   SQLState.NET_IOEXCEPTION_ON_STREAMLEN_VERIFICATION),
-															   new Integer(parameterIndex),
-															   e.getMessage(),
-															   e));
-		}
+        if (writeEXTDTAStatusByte) {
+            writeEXTDTAStatus(DRDAConstants.STREAM_OK);
+        }
 	}
 
 
@@ -773,9 +823,29 @@ public class Request {
     }
     
 
-    // the offset_ must not be updated when an error is encountered
-    // note valid data may be overwritten
-    protected final void padScalarStreamForError(int leftToRead, int bytesToRead) throws DisconnectException {
+    /**
+     * Pads a value with zeros until it has reached its defined length.
+     * <p>
+     * This functionality was introduced to handle the error situation where
+     * the actual length of the user stream is shorter than specified. To avoid
+     * DRDA protocol errors (or in this case a hang), we have to pad the data
+     * until the specified length has been reached. In a later increment the
+     * Derby-specific EXTDTA status flag was introduced to allow the client to
+     * inform the server that the value sent is invalid.
+     *
+     * @param leftToRead total number of bytes left to read
+     * @param bytesToRead remaining bytes to read before flushing
+     * @param writeStatus whether or not to wrote the Derby-specific trailing
+     *      EXTDTA status flag (see DRDAConstants)
+     * @param status the EXTDTA status (for this data value), ignored if
+     *      {@code writeStatus} is {@code false}
+     * @throws DisconnectException if flushing the buffer fails
+     */
+    protected final void padScalarStreamForError(int leftToRead,
+                                                 int bytesToRead,
+                                                 boolean writeStatus,
+                                                 byte status)
+            throws DisconnectException {
         do {
             do {
                 bytes_[offset_++] = (byte) (0x0); // use 0x0 as the padding byte
@@ -785,6 +855,11 @@ public class Request {
 
             bytesToRead = flushScalarStreamSegment(leftToRead, bytesToRead);
         } while (leftToRead > 0);
+
+        // Append the EXTDTA status flag if appropriate.
+        if (writeStatus) {
+            writeEXTDTAStatus(status);
+        }
     }
 
     private final void writeExtendedLengthBytes(int extendedLengthByteCount, long length) {
@@ -1758,7 +1833,25 @@ public class Request {
         }
         
     }
-    
+
+    /**
+     * Writes the Derby-specific EXTDTA status flag to the send buffer.
+     * <p>
+     * The existing buffer is flushed to make space for the flag if required.
+     *
+     * @param flag the Derby-specific EXTDTA status flag
+     * @throws DisconnectException if flushing the buffer fails
+     */
+    private void writeEXTDTAStatus(byte flag)
+            throws DisconnectException {
+        // Write the status byte to the send buffer.
+        // Make sure we have enough space for the status byte.
+        if (offset_ == bytes_.length) {
+            flushScalarStreamSegment(1, 0); // Trigger a flush.
+        }
+        bytes_[offset_++] = flag;
+        // The last byte will be sent on the next flush.
+    }
 
     public void setDssLengthLocation(int location) {
         dssLengthLocation_ = location;

Modified: db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/AppRequester.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/AppRequester.java?rev=936950&r1=936949&r2=936950&view=diff
==============================================================================
--- db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/AppRequester.java (original)
+++ db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/AppRequester.java Thu Apr 22 16:45:22 2010
@@ -295,6 +295,16 @@ class AppRequester
 		
 	}
 
+    /**
+     * Tells whether the client sends a trailing Derby-specific status byte
+     * when transferring EXTDTA objects.
+     *
+     * @return {@code true} if the status byte is sent, {@code false} if not
+     */
+    protected boolean supportsEXTDTAAbort() {
+        return (clientType == DNC_CLIENT && greaterThanOrEqualTo(10, 6, 0));
+    }
+
 	protected boolean supportsSessionDataCaching() {
 		return (clientType == DNC_CLIENT && greaterThanOrEqualTo(10, 4, 0));
 	}

Modified: db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/DDMReader.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/DDMReader.java?rev=936950&r1=936949&r2=936950&view=diff
==============================================================================
--- db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/DDMReader.java (original)
+++ db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/DDMReader.java Thu Apr 22 16:45:22 2010
@@ -992,13 +992,18 @@ class DDMReader
 	{
 		if (checkNullability && isEXTDTANull()) {
 			return null;
+        }
+
+        // Check if we must read the status byte sent by the client.
+        boolean readEXTDTAStatusByte =
+                agent.getSession().appRequester.supportsEXTDTAAbort();
             
-		} else if ( doingLayerBStreaming ){
-			return new LayerBStreamedEXTDTAReaderInputStream(this);
-        
+        if (doingLayerBStreaming) {
+            return new LayerBStreamedEXTDTAReaderInputStream(
+                    this, readEXTDTAStatusByte);
         } else {
-            return new StandardEXTDTAReaderInputStream(this);
-            
+            return new StandardEXTDTAReaderInputStream(
+                    this, readEXTDTAStatusByte);
         }
 
 	}

Modified: db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/DRDAConnThread.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/DRDAConnThread.java?rev=936950&r1=936949&r2=936950&view=diff
==============================================================================
--- db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/DRDAConnThread.java (original)
+++ db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/DRDAConnThread.java Thu Apr 22 16:45:22 2010
@@ -4987,7 +4987,7 @@ class DRDAConnThread extends Thread {
 
 	/**
 	 * Read different types of input parameters and set them in PreparedStatement
-	 * @param i			index of the parameter
+     * @param i zero-based index of the parameter
 	 * @param stmt			associated ps
 	 * @param drdaType	drda type of the parameter
 	 *
@@ -4998,127 +4998,52 @@ class DRDAConnThread extends Thread {
 									 int drdaType, int extLen, boolean streamLOB)
 				throws DRDAProtocolException, SQLException
 		{
-			PreparedStatement ps = stmt.getPreparedStatement();
+            // Note the switch from zero-based to one-based index below.
 			drdaType = (drdaType & 0x000000ff); // need unsigned value
 			boolean checkNullability = false;
 			if (sqlamLevel >= MGRLVL_7 &&
 				FdocaConstants.isNullable(drdaType))
 				checkNullability = true;
 
+            final EXTDTAReaderInputStream stream =
+                reader.getEXTDTAReaderInputStream(checkNullability);
+
+            // Determine encoding first, mostly for debug/tracing purposes
+            String encoding = "na";
+            switch (drdaType) {
+                case DRDAConstants.DRDA_TYPE_LOBCSBCS:
+                case DRDAConstants.DRDA_TYPE_NLOBCSBCS:
+                    encoding = stmt.ccsidSBCEncoding;
+                    break;
+                case DRDAConstants.DRDA_TYPE_LOBCDBCS:
+                case DRDAConstants.DRDA_TYPE_NLOBCDBCS:
+                    encoding = stmt.ccsidDBCEncoding;
+                    break;
+                case DRDAConstants.DRDA_TYPE_LOBCMIXED:
+                case DRDAConstants.DRDA_TYPE_NLOBCMIXED:
+                    encoding = stmt.ccsidMBCEncoding;
+                    break;
+            }
+
+            traceEXTDTARead(drdaType, i+1, stream, streamLOB, encoding);
+
 			try {	
-				final byte[] paramBytes;
-				final String paramString;
-				
 				switch (drdaType)
 				{
 					case  DRDAConstants.DRDA_TYPE_LOBBYTES:
 					case  DRDAConstants.DRDA_TYPE_NLOBBYTES:
-						paramString = "";
-						final boolean useSetBinaryStream = 
-							stmt.getParameterMetaData().getParameterType(i+1)==Types.BLOB;
-						
-						if (streamLOB && useSetBinaryStream) {
-							paramBytes = null;
-							final EXTDTAReaderInputStream stream = 
-								reader.getEXTDTAReaderInputStream(checkNullability);
-                            // Save the streamed parameter so we can drain it if it does not get used
-                            // by embedded when the statement is executed. DERBY-3085
-                            stmt.setStreamedParameter(stream);
-                            if( stream instanceof StandardEXTDTAReaderInputStream ){
-                                
-                                final StandardEXTDTAReaderInputStream stdeis = 
-                                    (StandardEXTDTAReaderInputStream) stream ;
-                                ps.setBinaryStream( i + 1, 
-                                                    stdeis, 
-                                                    (int) stdeis.getLength() );
-                                
-                            } else if( stream instanceof LayerBStreamedEXTDTAReaderInputStream ) {
-                                
-                                ( ( EnginePreparedStatement ) ps).setBinaryStream( i + 1, 
-                                                                                   stream);
-                                
-							} else if( stream == null ){
-                                ps.setBytes(i+1, null);
-                                
-                            } else {
-                                throw new IllegalStateException();
-                            }
-							
-							if (SanityManager.DEBUG) {
-								if (stream==null) {
-									trace("parameter value : NULL");
-								} else {
-									trace("parameter value will be streamed");
-								}
-							}
-						} else {
-                            final EXTDTAReaderInputStream stream = 
-								reader.getEXTDTAReaderInputStream(checkNullability);
-							
-                            if ( stream == null ) {
-								
-                                ps.setBytes(i+1, 
-                                            null );
-                                
-                                if (SanityManager.DEBUG) {
-									trace("parameter value : NULL");
-                                }
-                                
-							} else {
-
-                                ByteArrayInputStream bais = 
-                                    convertAsByteArrayInputStream( stream );
-                                
-                                if (SanityManager.DEBUG) {
-									trace("parameter value is a LOB with length:" +
-										  bais.available() );
-								}
-                                
-								ps.setBinaryStream(i+1, 
-                                                   bais,
-												   bais.available() );
-                                
-							}
-							
-						}
+                        setAsBinaryStream(stmt, i+1, stream, streamLOB);
 						break;
 					case DRDAConstants.DRDA_TYPE_LOBCSBCS:
 					case DRDAConstants.DRDA_TYPE_NLOBCSBCS:
-                        
-                        setAsCharacterStream(stmt,
-                                             i,
-                                             checkNullability,
-                                             reader,
-                                             streamLOB,
-                                             stmt.ccsidSBCEncoding );
-
-						break;
 					case DRDAConstants.DRDA_TYPE_LOBCDBCS:
 					case DRDAConstants.DRDA_TYPE_NLOBCDBCS:
-                        
-                        setAsCharacterStream(stmt,
-                                             i,
-                                             checkNullability,
-                                             reader,
-                                             streamLOB,
-                                             stmt.ccsidDBCEncoding);
-                        
-						break;
 					case DRDAConstants.DRDA_TYPE_LOBCMIXED:
 					case DRDAConstants.DRDA_TYPE_NLOBCMIXED:
-
-                        setAsCharacterStream(stmt,
-                                             i,
-                                             checkNullability,
-                                             reader,
-                                             streamLOB,
-                                             stmt.ccsidMBCEncoding);
-                        
+                        setAsCharacterStream(stmt, i+1, stream, streamLOB,
+                                encoding);
 						break;
 					default:
-						paramBytes = null;
-						paramString = "";
-
 						invalidValue(drdaType);
 				}
 			     
@@ -8281,6 +8206,41 @@ class DRDAConnThread extends Thread {
 			server.consoleMessage(value, true);
 	}
 
+
+    /**
+     * Sends a trace string to the console when reading an EXTDTA value (if
+     * tracing is enabled).
+     *
+     * @param drdaType the DRDA type of the EXTDTA value
+     * @param index the one-based parameter index
+     * @param stream the stream being read
+     * @param streamLOB whether or not the value is being streamed as the last
+     *      parameter value in the DRDA protocol flow
+     * @param encoding the encoding of the data, if any
+     */
+    private void traceEXTDTARead(int drdaType, int index,
+                                 EXTDTAReaderInputStream stream,
+                                 boolean streamLOB, String encoding) {
+        if (SanityManager.DEBUG && server.debugOutput == true) {
+            StringBuffer sb = new StringBuffer("Reading/setting EXTDTA: ");
+            // Data: t<type>/i<ob_index>/<streamLOB>/<encoding>/
+            //       <statusByteExpected>/b<byteLength>
+            sb.append("t").append(drdaType).append("/i").append(index).
+                    append("/").append(streamLOB).
+                    append("/").append(encoding).append("/").
+                    append(stream.readStatusByte). append("/b");
+            if (stream == null) {
+                sb.append("NULL");
+            } else if (stream.isLayerBStream()) {
+                sb.append("UNKNOWN_LENGTH");
+            } else {
+                sb.append(
+                        ((StandardEXTDTAReaderInputStream)stream).getLength());
+            }
+            trace(sb.toString());
+        }
+    }
+
 	/***
 	 * Show runtime memory
 	 *
@@ -8761,19 +8721,26 @@ class DRDAConnThread extends Thread {
     }
     
     
-    private static ByteArrayInputStream 
+    private static InputStream
         convertAsByteArrayInputStream( EXTDTAReaderInputStream stream )
         throws IOException {
-        
+
+        // Suppress the exception that may be thrown when reading the status
+        // byte here, we want the embedded statement to fail while executing.
+        stream.setSuppressException(true);
+
         final int byteArrayLength = 
             stream instanceof StandardEXTDTAReaderInputStream ?
             (int) ( ( StandardEXTDTAReaderInputStream ) stream ).getLength() : 
-            32;// default length
-        
+            1 + stream.available(); // +1 to avoid infinite loop
+
+        // TODO: We will run into OOMEs for large values here.
+        //       Could avoid this by saving value temporarily to disk, for
+        //       instance by using the existing LOB code.
         PublicBufferOutputStream pbos = 
             new PublicBufferOutputStream( byteArrayLength );
-        
-        byte[] buffer = new byte[32 * 1024];
+
+        byte[] buffer = new byte[Math.min(byteArrayLength, 32*1024)];
         
         int c = 0;
         
@@ -8783,9 +8750,17 @@ class DRDAConnThread extends Thread {
             pbos.write( buffer, 0, c );
         }
 
-        return new ByteArrayInputStream( pbos.getBuffer(),
-                                         0, 
-                                         pbos.getCount() );
+        // Check if the client driver encountered any errors when reading the
+        // source on the client side.
+        if (stream.isStatusSet() &&
+                stream.getStatus() != DRDAConstants.STREAM_OK) {
+            // Create a stream that will just fail when accessed.
+            return new FailingEXTDTAInputStream(stream.getStatus());
+        } else {
+            return new ByteArrayInputStream( pbos.getBuffer(),
+                                             0,
+                                             pbos.getCount() );
+        }
 
     }
     
@@ -8806,37 +8781,87 @@ class DRDAConnThread extends Thread {
         
     }
     
-    private static void setAsCharacterStream(DRDAStatement stmt,
-                                             int i,
-                                             boolean checkNullability,
-                                             DDMReader reader,
-                                             boolean streamLOB,
-                                             String encoding) 
-        throws DRDAProtocolException ,
-               SQLException ,
-               IOException {
+    /**
+     * Sets the specified character EXTDTA parameter of the embedded statement.
+     *
+     * @param stmt the DRDA statement to use
+     * @param i the one-based index of the parameter
+     * @param extdtaStream the EXTDTA stream to read data from
+     * @param streamLOB whether or not the stream content is streamed as the
+     *      last value in the DRDA protocol flow
+     * @param encoding the encoding of the EXTDTA stream
+     * @throws IOException if reading from the stream fails
+     * @throws SQLException if setting the stream fails
+     */
+    private static void setAsCharacterStream(
+                                         DRDAStatement stmt,
+                                         int i,
+                                         EXTDTAReaderInputStream extdtaStream,
+                                         boolean streamLOB,
+                                         String encoding)
+           throws IOException, SQLException {
         PreparedStatement ps = stmt.getPreparedStatement();
         EnginePreparedStatement engnps = 
             ( EnginePreparedStatement ) ps;
         
-        final EXTDTAReaderInputStream extdtastream = 
-            reader.getEXTDTAReaderInputStream(checkNullability);
         // DERBY-3085. Save the stream so it can be drained later
         // if not  used.
         if (streamLOB)
-            stmt.setStreamedParameter(extdtastream);
+            stmt.setStreamedParameter(extdtaStream);
         
         final InputStream is = 
             streamLOB ?
-            (InputStream) extdtastream :
-            convertAsByteArrayInputStream( extdtastream );
+            (InputStream) extdtaStream :
+            convertAsByteArrayInputStream( extdtaStream );
         
         final InputStreamReader streamReader = 
             new InputStreamReader( is,
                                    encoding ) ;
         
-        engnps.setCharacterStream( i + 1, 
-                                   streamReader );
+        engnps.setCharacterStream(i, streamReader);
     }
 
+    /**
+     * Sets the specified binary EXTDTA parameter of the embedded statement.
+     *
+     * @param stmt the DRDA statement to use
+     * @param index the one-based index of the parameter
+     * @param stream the EXTDTA stream to read data from
+     * @param streamLOB whether or not the stream content is streamed as the
+     *      last value in the DRDA protocol flow
+     * @throws IOException if reading from the stream fails
+     * @throws SQLException  if setting the stream fails
+     */
+    private static void setAsBinaryStream(DRDAStatement stmt,
+                                          int index,
+                                          EXTDTAReaderInputStream stream,
+                                          boolean streamLOB)
+            throws IOException, SQLException {
+        int type = stmt.getParameterMetaData().getParameterType(index);
+        boolean useSetBinaryStream = (type == Types.BLOB);
+        PreparedStatement ps = stmt.getPreparedStatement();
+
+        if (streamLOB && useSetBinaryStream) {
+            // Save the streamed parameter so we can drain it if it does not
+            // get used by embedded when the statement is executed. DERBY-3085
+            stmt.setStreamedParameter(stream);
+            if (stream == null) {
+                ps.setBytes(index, null);
+            } else if (!stream.isLayerBStream()) {
+                int length = (int)((StandardEXTDTAReaderInputStream)
+                                                            stream).getLength();
+                ps.setBinaryStream(index, stream, length);
+
+            } else {
+                ((EnginePreparedStatement)ps).setBinaryStream(index, stream);
+            }
+        } else {
+            if (stream == null) {
+                ps.setBytes(index, null);
+            } else {
+                InputStream bais = convertAsByteArrayInputStream(stream);
+                ps.setBinaryStream(index, bais, bais.available());
+            }
+        }
+    }
 }

Modified: db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/EXTDTAReaderInputStream.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/EXTDTAReaderInputStream.java?rev=936950&r1=936949&r2=936950&view=diff
==============================================================================
--- db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/EXTDTAReaderInputStream.java (original)
+++ db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/EXTDTAReaderInputStream.java Thu Apr 22 16:45:22 2010
@@ -19,15 +19,177 @@
    under the License.
 */
 package org.apache.derby.impl.drda;
-import java.io.InputStream;
-import java.io.ByteArrayInputStream;
+
 import java.io.IOException;
+import java.io.InputStream;
+import org.apache.derby.iapi.reference.DRDAConstants;
+
+import org.apache.derby.iapi.reference.SQLState;
+import org.apache.derby.iapi.services.i18n.MessageService;
+import org.apache.derby.iapi.services.io.DerbyIOException;
+import org.apache.derby.shared.common.reference.MessageId;
 
 /**
  * Implementation of InputStream which get EXTDTA from the DDMReader.
+ * <p>
  * This class can be used to stream LOBs from Network client to the
  * Network server.
+ * <p>
+ * To be able to correctly stream data from the client without reading the
+ * while value up front, a trailing Derby-specific status byte was introduced
+ * (version 10.6). It is used by the client to tell the server if the data it
+ * received was valid, or if it detected an error while streaming the data.
+ * The DRDA protocol, or at least Derby's implementation of it, doesn't enable
+ * the client to inform the server about the error whilst streaming (there is a
+ * mechanism in DRDA to interrupt a running request, but it didn't seem like a
+ * feasible approach in this case).
  */
-abstract class EXTDTAReaderInputStream extends InputStream 
-{
+abstract class EXTDTAReaderInputStream
+        extends InputStream {
+
+    /** Whether or not the subclass is a layer B stream. */
+    protected final boolean isLayerBStream;
+    /** Whether or not to read the trailing Derby-specific status byte. */
+    protected final boolean readStatusByte;
+    /**
+     * Tells if the status byte has been set.
+     *
+     * @see #checkStatus(int)
+     */
+    private boolean statusSet;
+    /**
+     * The status Derby-specific status byte, if any.
+     * @see #isStatusSet()
+     */
+    private byte status;
+    /**
+     * Whether or not to suppress the exception when an error is indicated by
+     * the status byte.
+     */
+    private boolean suppressException;
+
+    /**
+     * Initializes the class.
+     *
+     * @param layerB whether or not DDM layer B streaming is being used
+     * @param readStatusByte whether or not to read the trailing Derby-specific
+     *      status byte
+     */
+    protected EXTDTAReaderInputStream(boolean layerB, boolean readStatusByte) {
+        this.isLayerBStream = layerB;
+        this.readStatusByte = readStatusByte;
+    }
+
+    /**
+     * Saves the status byte read off the wire.
+     *
+     * @param status the status
+     * @see DRDAConstants
+     */
+    // Private for now, as the method is currently used only by checkStatus.
+    private void setStatus(int status) {
+        this.status = (byte)(status & 0xFF);
+        this.statusSet = true;
+    }
+
+    /**
+     * Returns whether the status has been set or not.
+     *
+     * @return {@code true} if set, {@code false} if not.
+     */
+    public boolean isStatusSet() {
+        return statusSet;
+    }
+
+    /**
+     * Returns the status byte.
+     * <p>
+     * <em>NOTE:</em> Check if the status byte has been set by calling
+     * {@linkplain #isStatusSet()}.
+     *
+     * @return The status byte.
+     */
+    public byte getStatus() {
+        if (!statusSet) {
+            throw new IllegalStateException("status hasn't been set");
+        }
+        return status;
+    }
+
+    /**
+     * Sets whether or not to suppress the exception when setting the status.
+     *
+     * @param flag {@code true} to suppress, {@code false} to throw exception
+     *      if an error condition is indicated by the status flag
+     */
+    void setSuppressException(boolean flag) {
+        this.suppressException = flag;
+    }
+
+    public boolean isLayerBStream() {
+        return isLayerBStream;
+    }
+
+    /**
+     * Interprets the Derby-specific status byte, and throws an exception if an
+     * error condition has been detected on the client.
+     *
+     * @param clientStatus the status flag sent by the client
+     * @throws IOException if the status byte indicates an error condition
+     */
+    protected void checkStatus(int clientStatus)
+            throws IOException {
+        // Note that in some cases we don't want to throw an exception here
+        // even if the status byte tells us an exception happened on the client
+        // side when reading the data stream. This is because sometimes EXTDTAs
+        // are // fully read before they are passed to the statement. If we
+        // throw the exception here, we cause DRDA protocol errors (it would
+        // probably be possible to code around this, but it is far easier to
+        // just have the embedded statement execution fail).
+
+        setStatus(clientStatus);
+        if (!suppressException && status != DRDAConstants.STREAM_OK) {
+            // Ask the sub-class to clean up.
+            onClientSideStreamingError();
+            throwEXTDTATransferException(clientStatus);
+        }
+    }
+
+    /**
+     * Performs necessary clean up when an error is signalled by the client.
+     */
+    protected abstract void onClientSideStreamingError();
+
+    /**
+     * Throws an exception as mandated by the EXTDTA status byte.
+     *
+     * @param status the EXTDTA status byte received from the client, should
+     *      not be {@linkplain DRDAConstants#STREAM_OK}
+     * @throws IOException the exception generated based on the status byte
+     */
+    static void throwEXTDTATransferException(int status)
+            throws IOException {
+        switch (status) {
+            case DRDAConstants.STREAM_READ_ERROR:
+                throw new IOException(
+                        MessageService.getTextMessage(
+                            MessageId.STREAM_DRDA_CLIENTSIDE_EXTDTA_READ_ERROR)
+                         );
+            case DRDAConstants.STREAM_TOO_SHORT:
+            case DRDAConstants.STREAM_TOO_LONG:
+                throw new DerbyIOException(
+                        MessageService.getTextMessage(
+                            SQLState.SET_STREAM_INEXACT_LENGTH_DATA),
+                        SQLState.SET_STREAM_INEXACT_LENGTH_DATA);
+            case DRDAConstants.STREAM_OK:
+                // Safe-guard, this method should not be invoked when the
+                // transfer was successful.
+                throw new IllegalStateException(
+                        "throwEXTDTATransferException invoked with EXTDTA " +
+                        "status byte STREAM_OK");
+            default:
+                throw new IOException(
+                        "Invalid stream EXTDTA status code: " + status);
+        }
+    }
 }

Added: db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/FailingEXTDTAInputStream.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/FailingEXTDTAInputStream.java?rev=936950&view=auto
==============================================================================
--- db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/FailingEXTDTAInputStream.java (added)
+++ db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/FailingEXTDTAInputStream.java Thu Apr 22 16:45:22 2010
@@ -0,0 +1,53 @@
+/*
+   Derby - Class org.apache.derby.impl.drda.FailingEXTDTAReaderInputStream
+
+   Licensed to the Apache Software Foundation (ASF) under one
+   or more contributor license agreements.  See the NOTICE file
+   distributed with this work for additional information
+   regarding copyright ownership.  The ASF licenses this file
+   to you under the Apache License, Version 2.0 (the
+   "License"); you may not use this file except in compliance
+   with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing,
+   software distributed under the License is distributed on an
+   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+   KIND, either express or implied.  See the License for the
+   specific language governing permissions and limitations
+   under the License.
+*/
+package org.apache.derby.impl.drda;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * A stream class that throws an exception on the first read request.
+ */
+public final class FailingEXTDTAInputStream
+        extends InputStream {
+
+    /** The status byte used to determine which exception to throw. */
+    private final byte extdtaStatus;
+
+    public FailingEXTDTAInputStream(byte extdtaStatus) {
+        this.extdtaStatus = extdtaStatus;
+    }
+
+    /**
+     * Throws an exception.
+     *
+     * @return n/a
+     * @throws IOException The exception to throw as dictated by the status
+     *      byte sent by the client driver when reading user data and sending
+     *      it as EXTDTA.
+     */
+    public int read()
+            throws IOException {
+        EXTDTAReaderInputStream.throwEXTDTATransferException(extdtaStatus);
+        // Should never get this far, but just in case...
+        throw new IllegalStateException("programming error - EXTDTA status");
+    }
+}

Propchange: db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/FailingEXTDTAInputStream.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/LayerBStreamedEXTDTAReaderInputStream.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/LayerBStreamedEXTDTAReaderInputStream.java?rev=936950&r1=936949&r2=936950&view=diff
==============================================================================
--- db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/LayerBStreamedEXTDTAReaderInputStream.java (original)
+++ db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/LayerBStreamedEXTDTAReaderInputStream.java Thu Apr 22 16:45:22 2010
@@ -38,13 +38,16 @@ final class LayerBStreamedEXTDTAReaderIn
     /**
      * Constructor
      * @param reader The reader to get data from
+     * @param readStatusByte whether or not to read the trailing Derby-specific
+     *      EXTDTA stream status byte
      * @exception DRDAProtocolException if thrown while initializing current 
      *                                  buffer.
      */
-    LayerBStreamedEXTDTAReaderInputStream(final DDMReader reader) 
+    LayerBStreamedEXTDTAReaderInputStream(final DDMReader reader,
+                                          boolean readStatusByte)
         throws DRDAProtocolException
     {
-        super();
+        super(true, readStatusByte);
         this.reader = reader;
         this.currentBuffer = 
             reader.readLOBInitStream();
@@ -63,13 +66,11 @@ final class LayerBStreamedEXTDTAReaderIn
      * @see        java.io.InputStream#read()
      */
     public final int read() 
-        throws IOException
-    {
-        int val = (currentBuffer == null) ? -1 : currentBuffer.read();
-        if (val < 0) {
-            val = refreshCurrentBuffer();
-        }
-        return val;
+            throws IOException {
+        // Reuse the other read method for simplicity.
+        byte[] b = new byte[1];
+        int read = read(b);
+        return (read == 1 ? b[0] : -1);
     }
     
     /**
@@ -95,20 +96,48 @@ final class LayerBStreamedEXTDTAReaderIn
      * @see        java.io.InputStream#read(byte[], int, int)
      */
     public final int read(final byte[] b,
-                          final int off,
-                          final int len) 
+                          int off,
+                          int len) 
         throws IOException
     {
-        int val = currentBuffer.read(b, off, len);
-        
-        if (val < 0 && 
-            reader.doingLayerBStreaming() ) {
-            
-            currentBuffer = 
-                reader.readLOBContinuationStream();
+        if (currentBuffer == null) {
+            return -1;
+        }
+
+        // WARNING: We are relying on ByteArrayInputStream.available below.
+        //          Replacing the stream class with another stream class may
+        //          not give expected results.
+
+        int val;
+        if (reader.doingLayerBStreaming()) {
+            // Simple read, we will either read part of the current buffer or
+            // all of it. We know there is at least one more byte on the wire.
+            val = currentBuffer.read(b, off, len);
+            if (currentBuffer.available() == 0) {
+                currentBuffer = reader.readLOBContinuationStream();
+            }
+        } else if (readStatusByte) {
+            // Reading from the last buffer, make sure we handle the Derby-
+            // specific status byte and that we don't return it to the user.
+            int maxToRead = currentBuffer.available() -1;
+            val = currentBuffer.read(b, off, Math.min(maxToRead, len));
+            if (maxToRead == 0) {
+                // Only status byte left.
+                checkStatus(currentBuffer.read());
+                val = -1;
+                currentBuffer = null;
+            } else if (maxToRead == val) {
+                checkStatus(currentBuffer.read());
+                currentBuffer = null;
+            }
+        } else {
+            // Reading from the last buffer, no Derby-specific status byte sent.
             val = currentBuffer.read(b, off, len);
-            
+            if (currentBuffer.available() == 0) {
+                currentBuffer = null;
+            }
         }
+
         return val;
     }
 
@@ -123,32 +152,24 @@ final class LayerBStreamedEXTDTAReaderIn
      * @return     the number of bytes that can be read from this input stream
      *             without blocking.     
      */
-    public final int available() 
-    {
-        return currentBuffer.available();
+    public final int available() {
+        int avail = 0;
+        if (currentBuffer != null) {
+            avail = currentBuffer.available();
+            if (readStatusByte && !reader.doingLayerBStreaming()) {
+                avail--;
+            }
+        }
+        return avail;
     }
 
     
-    /**
-     * Refresh the current buffer from the DDMReader
-     * @exception IOException if there is a IOException when
-     *                        refreshing the buffer from DDMReader
-     * @return the next byte of data, or <code>-1</code> if the end of the
-     *         stream is reached and layer B streaming was finished.
-     */
-    private int refreshCurrentBuffer() 
-        throws IOException
-    {
-        
-        if( ! reader.doingLayerBStreaming() )
-            return -1;
-        
-        currentBuffer = 
-            reader.readLOBContinuationStream();
-        return currentBuffer.read();
+    protected void onClientSideStreamingError() {
+        // Clean state and return -1 on subsequent calls.
+        // The status byte is the last byte, so no need to drain the source.
+        currentBuffer = null;
     }
     
-    
     /** DDMReader. Used to get more data. */
     private final DDMReader reader;
     

Modified: db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/StandardEXTDTAReaderInputStream.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/StandardEXTDTAReaderInputStream.java?rev=936950&r1=936949&r2=936950&view=diff
==============================================================================
--- db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/StandardEXTDTAReaderInputStream.java (original)
+++ db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/StandardEXTDTAReaderInputStream.java Thu Apr 22 16:45:22 2010
@@ -19,9 +19,10 @@
    under the License.
 */
 package org.apache.derby.impl.drda;
-import java.io.InputStream;
+
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 
 /**
  * Implementation of InputStream which get EXTDTA from the DDMReader.
@@ -31,20 +32,31 @@ import java.io.IOException;
 final class StandardEXTDTAReaderInputStream extends EXTDTAReaderInputStream 
 {
     /**
-     * Constructor
+     * Constructor.
+     *
      * @param reader The reader to get data from
+     * @param readStatusByte whether or not to read the trailing Derby-specific
+     *      EXTDTA stream status byte
      * @exception DRDAProtocolException if thrown while initializing current 
      *                                  buffer.
      */
-    StandardEXTDTAReaderInputStream(final DDMReader reader) 
+    StandardEXTDTAReaderInputStream(final DDMReader reader,
+                                    boolean readStatusByte)
         throws DRDAProtocolException
     {
-        super();
+        super(false, readStatusByte);
         this.reader = reader;
-        this.length = reader.getDdmLength();        
-        this.remainingBytes = length;
+        // Exclude the status byte in the byte count.
+        if (readStatusByte) {
+            this.remainingBytes = reader.getDdmLength() -1;
+        } else {
+            this.remainingBytes = reader.getDdmLength();
+        }
+        this.length = remainingBytes;
+        // Make sure we read the product specific extension byte off the wire.
+        // It will be read here if the value fits into a single DSS.
         this.currentBuffer = 
-            reader.readLOBInitStream(remainingBytes);
+            reader.readLOBInitStream(remainingBytes + (readStatusByte ? 1 : 0));
     }
 
     /**
@@ -60,19 +72,13 @@ final class StandardEXTDTAReaderInputStr
      * @see        java.io.InputStream#read()
      */
     public final int read() 
-        throws IOException
-    {
-        if (remainingBytes <= 0) {
-            return -1;
-        }
-        int val = (currentBuffer == null) ? -1 : currentBuffer.read();
-        if (val < 0) {
-            val = refreshCurrentBuffer();
-        }
-        remainingBytes--;
-        return val;
+            throws IOException {
+        // Reuse the other read method for simplicity.
+        byte[] b = new byte[1];
+        int read = read(b);
+        return (read == 1 ? b[0] : -1);
     }
-    
+
     /**
      * Reads up to <code>len</code> bytes of data from the input stream into
      * an array of bytes.  An attempt is made to read as many as
@@ -97,18 +103,32 @@ final class StandardEXTDTAReaderInputStr
      */
     public final int read(final byte[] b,
                           final int off,
-                          final int len) 
+                          int len) 
         throws IOException
     {
         if (remainingBytes <= 0) {
             return -1;
         }
+        // Adjust length to avoid reading the trailing status byte.
+        len = (int)Math.min(remainingBytes, (long)len);
         int val = currentBuffer.read(b, off, len);
         if (val < 0) {
-            currentBuffer = 
-                reader.readLOBContinuationStream(remainingBytes);
+            nextBuffer();
             val = currentBuffer.read(b, off, len);
         }
+        // If we are reading the last data byte, check the status byte.
+        if (readStatusByte && val == remainingBytes) {
+            if (currentBuffer.available() == 0) {
+                // Fetch the last buffer (containing only the status byte).
+                nextBuffer();
+            }
+            checkStatus(currentBuffer.read());
+            // Sanity check.
+            if (currentBuffer.read() != -1) {
+                throw new IllegalStateException(
+                        "Remaining bytes in buffer after status byte");
+            }
+        }
         remainingBytes -= val;
         return val;
     }
@@ -129,13 +149,22 @@ final class StandardEXTDTAReaderInputStr
         if (remainingBytes <= 0) {
             return 0;
         }
-        return currentBuffer.available();
+        int inBuffer = currentBuffer.available();
+        // Adjust for the status byte if required.
+        if (readStatusByte && inBuffer > remainingBytes) {
+            inBuffer--;
+        }
+        return inBuffer;
     }
 
     /**
-     * Return the length if this stream. The length includes data which has 
-     * been read.
-     * @return length of this stream.
+     * Returns the number of bytes returned by this stream.
+     * <p>
+     * The length includes data which has been already read at the invocation
+     * time, but doesn't include any meta data (like the Derby-specific
+     * EXTDTA status byte).
+     *
+     * @return The number of bytes that will be returned by this stream.
      */
     final long getLength() 
     {
@@ -143,33 +172,36 @@ final class StandardEXTDTAReaderInputStr
     }
     
     /**
-     * Refresh the current buffer from the DDMReader
-     * @exception IOException if there is a IOException when
-     *                        refreshing the buffer from DDMReader
-     * @return the next byte of data, or <code>-1</code> if the end of the
-     *         stream is reached.
+     * Fetches the next buffer.
+     *
+     * @throws IOException if fetching the buffer fails
      */
-    private int refreshCurrentBuffer() 
-        throws IOException
-    {
-        if (remainingBytes > 0) {
-            currentBuffer = 
-                reader.readLOBContinuationStream(remainingBytes);
-            return currentBuffer.read();
-        } else {
-            return -1;
-        }
+    private void nextBuffer()
+            throws IOException {
+        // Make sure we read the status byte off the wire if it was sent.
+        long wireBytes = readStatusByte ? remainingBytes +1 : remainingBytes;
+        currentBuffer = reader.readLOBContinuationStream(wireBytes);
     }
-    
+
+    /**
+     * Cleans up and closes the stream.
+     */
+    protected void onClientSideStreamingError() {
+        // Clean state and return -1 on subsequent calls.
+        // The status byte is the last byte, so no need to drain the source.
+        currentBuffer = null;
+        remainingBytes = -1;
+    }
+
     /** Length of stream */
     private final long length;
     
     /** DDMReader. Used to get more data. */
     private final DDMReader reader;
-    
+
     /** Remaining bytes in stream */
     private long remainingBytes;
-    
+
     /** Current data buffer */
     private ByteArrayInputStream currentBuffer;
 

Modified: db/derby/code/trunk/java/engine/org/apache/derby/iapi/reference/DRDAConstants.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/engine/org/apache/derby/iapi/reference/DRDAConstants.java?rev=936950&r1=936949&r2=936950&view=diff
==============================================================================
--- db/derby/code/trunk/java/engine/org/apache/derby/iapi/reference/DRDAConstants.java (original)
+++ db/derby/code/trunk/java/engine/org/apache/derby/iapi/reference/DRDAConstants.java Thu Apr 22 16:45:22 2010
@@ -236,4 +236,19 @@ public	interface	DRDAConstants
     public   static final int DRDA_TIMESTAMP_LENGTH = 26;
     public   static final int JDBC_TIMESTAMP_LENGTH = 29;
 
+    // Values for the EXTDTA stream status byte.
+    // The use of this status byte is a product specific extension. The same
+    // goes for the values below, they are not described by DRDA (nor DDM).
+
+    /** Constant indicating a valid stream transfer. */
+    public static final byte STREAM_OK = 0x7F;
+    /**
+     * Constant indicating that the client encountered an error when reading
+     * the user stream.
+     */
+    public static final byte STREAM_READ_ERROR = 0x01;
+    /** Constant indicating that the user stream was too short. */
+    public static final byte STREAM_TOO_SHORT = 0x02;
+    /** Constant indicating that the user stream was too long. */
+    public static final byte STREAM_TOO_LONG = 0x04;
 }

Modified: db/derby/code/trunk/java/engine/org/apache/derby/loc/messages.xml
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/engine/org/apache/derby/loc/messages.xml?rev=936950&r1=936949&r2=936950&view=diff
==============================================================================
--- db/derby/code/trunk/java/engine/org/apache/derby/loc/messages.xml (original)
+++ db/derby/code/trunk/java/engine/org/apache/derby/loc/messages.xml Thu Apr 22 16:45:22 2010
@@ -7595,6 +7595,11 @@ Shutting down instance {0} with class lo
                 <text>No mark set, or mark read ahead limit exceeded.</text>
             </msg>
 
+            <msg>
+                <name>I028</name>
+                <text>Stream read error on client side when transferring user data to server.</text>
+            </msg>
+
         </family>
 
 

Modified: db/derby/code/trunk/java/shared/org/apache/derby/shared/common/reference/MessageId.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/shared/org/apache/derby/shared/common/reference/MessageId.java?rev=936950&r1=936949&r2=936950&view=diff
==============================================================================
--- db/derby/code/trunk/java/shared/org/apache/derby/shared/common/reference/MessageId.java (original)
+++ db/derby/code/trunk/java/shared/org/apache/derby/shared/common/reference/MessageId.java Thu Apr 22 16:45:22 2010
@@ -193,6 +193,11 @@ public interface MessageId {
      * read ahead limit of the mark was exceeded.
      */
     String STREAM_MARK_UNSET_OR_EXCEEDED                    = "I027";
+    /**
+     * Error message when the client encounters a read error when reading a
+     * user stream, which it is in the process of sending to the server.
+     */
+    String STREAM_DRDA_CLIENTSIDE_EXTDTA_READ_ERROR         = "I028";
 
     /*
      * Monitor