You are viewing a plain text version of this content. The canonical link for it is here.
Posted to derby-commits@db.apache.org by an...@apache.org on 2006/08/30 15:06:06 UTC

svn commit: r438478 - in /db/derby/code/trunk/java/drda/org/apache/derby/impl/drda: DDMReader.java DRDAConnThread.java EXTDTAReaderInputStream.java

Author: andreask
Date: Wed Aug 30 06:06:05 2006
New Revision: 438478

URL: http://svn.apache.org/viewvc?rev=438478&view=rev
Log:
DERBY-1559 when receiving a single EXTDTA object representing a BLOB, the server do not need to read it into memory before inserting it into the DB. Patch which reduces memory usage from 350 MB to 40 MB in network server vm when inserting a 64MB BLOB

Added:
    db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/EXTDTAReaderInputStream.java   (with props)
Modified:
    db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/DDMReader.java
    db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/DRDAConnThread.java

Modified: db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/DDMReader.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/DDMReader.java?rev=438478&r1=438477&r2=438478&view=diff
==============================================================================
--- db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/DDMReader.java (original)
+++ db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/DDMReader.java Wed Aug 30 06:06:05 2006
@@ -24,6 +24,7 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.ByteArrayInputStream;
 import java.math.BigDecimal;
 
 /**
@@ -945,6 +946,85 @@
 		return  getExtData(ddmScalarLen, checkNullability);
 	}
 
+	
+	/**
+	 * Creates an InputStream which can stream EXTDTA objects.
+	 * The InputStream uses this DDMReader to read data from network. The 
+	 * DDMReader should not be used before all data in the stream has been read.
+	 * @param checkNullability used to check if the stream is null. If it is 
+	 * null, this method returns null
+	 * @return EXTDTAReaderInputStream object which can be passed to prepared
+	 *         statement as a binary stream.
+	 * @exception DRDAProtocolException standard DRDA protocol exception
+	 */
+	EXTDTAReaderInputStream getEXTDTAReaderInputStream
+		(final boolean checkNullability)
+		throws DRDAProtocolException
+	{
+		if (checkNullability && isEXTDTANull()) {
+			return null;
+		} else {
+			return new EXTDTAReaderInputStream(this);
+		}
+	}
+
+	/**
+	 * This method is used by EXTDTAReaderInputStream to read the first chunk 
+	 * of data.
+	 * @param desiredLength the desired length of chunk
+	 * @exception DRDAProtocolException standard DRDA protocol exception
+	 */
+	ByteArrayInputStream readLOBInitStream(final long desiredLength) 
+		throws DRDAProtocolException
+	{
+		return readLOBChunk(false, desiredLength);
+	}
+	
+	/**
+	 * This method is used by EXTDTAReaderInputStream to read the next chunk 
+	 * of data.
+	 * @param desiredLength the desired length of chunk
+	 * @exception IOException IOException
+	 */
+	ByteArrayInputStream readLOBContinuationStream (final long desiredLength)
+		throws IOException
+	{		
+		try {
+			return readLOBChunk(true, desiredLength);
+		} catch (DRDAProtocolException e) {
+			e.printStackTrace(agent.getServer().logWriter);
+			throw new IOException(e.getMessage());
+		}
+	}
+
+	/**
+	 * This method is used by EXTDTAReaderInputStream to read the next chunk 
+	 * of data.
+	 * @param readHeader set to true if the dss continuation should be read
+	 * @param desiredLength the desired length of chunk
+	 * @exception DRDAProtocolException standard DRDA protocol exception
+	 */
+	private ByteArrayInputStream readLOBChunk
+		(final boolean readHeader, final long desiredLength)
+		throws DRDAProtocolException
+	{		
+		if (readHeader) {			
+			readDSSContinuationHeader();
+		}
+		int copySize = (int) Math.min(dssLength, desiredLength);
+		
+		// read the segment
+		ensureALayerDataInBuffer (copySize);
+		adjustLengths (copySize);
+		
+		// Create ByteArrayInputStream on top of buffer. 
+		// This will not make a copy of the buffer.
+		ByteArrayInputStream bais = 
+			new ByteArrayInputStream(buffer, pos, copySize);
+		pos += copySize;
+		
+		return bais;
+	}
 
 	byte[] getExtData (long desiredLength, boolean checkNullability) throws DRDAProtocolException
   {

Modified: db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/DRDAConnThread.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/DRDAConnThread.java?rev=438478&r1=438477&r2=438478&view=diff
==============================================================================
--- db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/DRDAConnThread.java (original)
+++ db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/DRDAConnThread.java Wed Aug 30 06:06:05 2006
@@ -1976,7 +1976,7 @@
 						break;
 					// optional
 					case CodePoint.EXTDTA:	
-						readAndSetAllExtParams(stmt);
+						readAndSetAllExtParams(stmt, false);
 						break;
 					default:
 						invalidCodePoint(codePoint);
@@ -3778,24 +3778,28 @@
 		// set the statement as the current statement
 		database.setCurrentStatement(stmt);
 		
-		
-		if (reader.isChainedWithSameID())
-			parseEXCSQLSTTobjects(stmt);
-		else if (isProcedure  && (needPrepareCall))
+		boolean hasResultSet;
+		if (reader.isChainedWithSameID()) 
+		{
+			hasResultSet = parseEXCSQLSTTobjects(stmt);
+		} else 
 		{
-			// if we had parameters the callable statement would
-			// be prepared with parseEXCQLSTTobjects, otherwise we
-			// have to do it here
-			String prepareString = "call " + stmt.procName +"()";
-			if (SanityManager.DEBUG) 
-				trace ("$$$prepareCall is: "+prepareString);
-			database.getConnection().clearWarnings();
-			CallableStatement cs = (CallableStatement) stmt.prepare(prepareString);
+			if (isProcedure  && (needPrepareCall))
+			{
+				// if we had parameters the callable statement would
+				// be prepared with parseEXCQLSTTobjects, otherwise we
+				// have to do it here
+				String prepareString = "call " + stmt.procName +"()";
+				if (SanityManager.DEBUG) 
+					trace ("$$$prepareCall is: "+prepareString);
+				database.getConnection().clearWarnings();
+				CallableStatement cs = (CallableStatement) stmt.prepare(prepareString);
+			}
+			stmt.ps.clearWarnings();
+			hasResultSet = stmt.execute();
 		}
-
-		stmt.ps.clearWarnings();
-
-		boolean hasResultSet = stmt.execute();
+		
+		
 		ResultSet rs = null;
 		if (hasResultSet)
 		{
@@ -3922,10 +3926,11 @@
 	 * @throws DRDAProtocolException
      * @throws SQLException
 	 */
-	private void parseEXCSQLSTTobjects(DRDAStatement stmt) throws DRDAProtocolException, SQLException
+	private boolean parseEXCSQLSTTobjects(DRDAStatement stmt) throws DRDAProtocolException, SQLException
 	{
 		int codePoint;
-		boolean gotSQLDTA = false, typeDefChanged = false;
+		boolean gotSQLDTA = false, gotEXTDTA = false;
+		boolean result = false;
 		do
 		{
 			correlationID = reader.readDssHeader();
@@ -3937,12 +3942,12 @@
 					// optional
 					case CodePoint.TYPDEFNAM:
 						setStmtOrDbByteOrder(false, stmt, parseTYPDEFNAM());
-						typeDefChanged = true;
+						stmt.setTypDefValues();
 						break;
 					// optional
 					case CodePoint.TYPDEFOVR:
 						parseTYPDEFOVR(stmt);
-						typeDefChanged = true;
+						stmt.setTypDefValues();
 						break;
 					// required
 					case CodePoint.SQLDTA:
@@ -3951,7 +3956,10 @@
 						break;
 					// optional
 					case CodePoint.EXTDTA:	
-						readAndSetAllExtParams(stmt);
+						readAndSetAllExtParams(stmt, true);
+						stmt.ps.clearWarnings();
+						result = stmt.execute();
+						gotEXTDTA = true;
 						break;
 					// optional
 					case CodePoint.OUTOVR:
@@ -3966,8 +3974,13 @@
 		// SQLDTA is required
 		if (! gotSQLDTA)
 			missingCodePoint(CodePoint.SQLDTA);
-		if (typeDefChanged)
-			stmt.setTypDefValues();
+		
+		if (! gotEXTDTA) {
+			stmt.ps.clearWarnings();
+			result = stmt.execute();
+		}
+		
+		return result;
 	}
 
 	/**
@@ -4206,7 +4219,7 @@
 					stmt.cliParamLens = paramLens;	
 					break;
 				case CodePoint.EXTDTA:
-					readAndSetAllExtParams(stmt);
+					readAndSetAllExtParams(stmt, false);
 					break;
 				default:
 					invalidCodePoint(codePoint);
@@ -4453,16 +4466,19 @@
 	}
 	
 
-	private void readAndSetAllExtParams(DRDAStatement stmt) 
+	private void readAndSetAllExtParams(final DRDAStatement stmt, final boolean streamLOB) 
 		throws SQLException, DRDAProtocolException
 	{
 		int numExt = stmt.cliParamExtPositions.size();
 		for (int i = 0; i < stmt.cliParamExtPositions.size(); i++)
 					{
 						int paramPos = ((Integer) (stmt.cliParamExtPositions).get(i)).intValue();
+						final boolean doStreamLOB = (streamLOB && i == numExt -1);
 						readAndSetExtParam(paramPos,
 										   stmt,
-										   ((Byte)stmt.cliParamDrdaTypes.elementAt(paramPos)).intValue(),((Integer)(stmt.cliParamLens.elementAt(paramPos))).intValue());
+										   ((Byte)stmt.cliParamDrdaTypes.elementAt(paramPos)).intValue(),
+										   ((Integer)(stmt.cliParamLens.elementAt(paramPos))).intValue(),
+										   doStreamLOB);
 						// Each extdta in it's own dss
 						if (i < numExt -1)
 						{
@@ -4484,7 +4500,7 @@
      * @throws SQLException
 	 */
 	private void readAndSetExtParam( int i, DRDAStatement stmt,
-									  int drdaType, int extLen)
+									 int drdaType, int extLen, boolean streamLOB)
 				throws DRDAProtocolException, SQLException
 		{
 			PreparedStatement ps = stmt.getPreparedStatement();
@@ -4495,24 +4511,55 @@
 				checkNullability = true;
 	
 			try {	
-				byte[] paramBytes = reader.getExtData(checkNullability);
-				String paramString = null;
+				final byte[] paramBytes;
+				final String paramString;
+				
 				switch (drdaType)
 				{
 					case  DRDAConstants.DRDA_TYPE_LOBBYTES:
 					case  DRDAConstants.DRDA_TYPE_NLOBBYTES:
-						if (SanityManager.DEBUG) {
-							if (paramBytes==null) {
-								trace("parameter value is NULL (LOB)");
+						paramString = "";
+						final boolean useSetBinaryStream = 
+							stmt.getParameterMetaData().getParameterType(i+1)==Types.BLOB;
+						
+						if (streamLOB && useSetBinaryStream) {
+							paramBytes = null;
+							final EXTDTAReaderInputStream stream = 
+								reader.getEXTDTAReaderInputStream(checkNullability);
+							if (stream==null) {
+								ps.setBytes(i+1, null);
 							} else {
-								trace("parameter value is a LOB with length: " + 
-									  paramBytes.length);
+								ps.setBinaryStream(i+1, stream, (int) stream.getLength());
+							}
+							
+							if (SanityManager.DEBUG) {
+								if (stream==null) {
+									trace("parameter value : NULL");
+								} else {
+									trace("parameter value will be streamed");
+								}
+							}
+						} else {
+							paramBytes = reader.getExtData(checkNullability);
+							if (paramBytes==null || !useSetBinaryStream) {
+								ps.setBytes(i+1, paramBytes);
+							} else {
+								ps.setBinaryStream(i+1, new ByteArrayInputStream(paramBytes),
+												   paramBytes.length);
+							}
+							if (SanityManager.DEBUG) {
+								if (paramBytes==null) {
+									trace("parameter value : NULL");
+								} else {
+									trace("parameter value is a LOB with length:" +
+										  paramBytes.length);
+								}
 							}
 						}
-						ps.setBytes(i+1, paramBytes);
 						break;
 					case DRDAConstants.DRDA_TYPE_LOBCSBCS:
 					case DRDAConstants.DRDA_TYPE_NLOBCSBCS:
+						paramBytes = reader.getExtData(checkNullability);
 						paramString = new String(paramBytes, stmt.ccsidSBCEncoding);
 						if (SanityManager.DEBUG)
 							trace("parameter value is: "+ paramString);
@@ -4520,6 +4567,7 @@
 						break;
 					case DRDAConstants.DRDA_TYPE_LOBCDBCS:
 					case DRDAConstants.DRDA_TYPE_NLOBCDBCS:
+						paramBytes = reader.getExtData(checkNullability);
 						paramString = new String(paramBytes, stmt.ccsidDBCEncoding );
 						if (SanityManager.DEBUG)
 							trace("parameter value is: "+ paramString);
@@ -4527,12 +4575,16 @@
 						break;
 					case DRDAConstants.DRDA_TYPE_LOBCMIXED:
 					case DRDAConstants.DRDA_TYPE_NLOBCMIXED:
+						paramBytes = reader.getExtData(checkNullability);
 						paramString = new String(paramBytes, stmt.ccsidMBCEncoding);
 						if (SanityManager.DEBUG)
 							trace("parameter value is: "+ paramString);
 						ps.setString(i+1,paramString);
 						break;
 					default:
+						paramBytes = null;
+						paramString = "";
+
 						invalidValue(drdaType);
 				}
 			     

Added: db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/EXTDTAReaderInputStream.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/EXTDTAReaderInputStream.java?rev=438478&view=auto
==============================================================================
--- db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/EXTDTAReaderInputStream.java (added)
+++ db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/EXTDTAReaderInputStream.java Wed Aug 30 06:06:05 2006
@@ -0,0 +1,176 @@
+/*
+   Derby - Class org.apache.derby.impl.drda.EXTDTAReaderInputStream
+
+   Licensed to the Apache Software Foundation (ASF) under one
+   or more contributor license agreements.  See the NOTICE file
+   distributed with this work for additional information
+   regarding copyright ownership.  The ASF licenses this file
+   to you under the Apache License, Version 2.0 (the
+   "License"); you may not use this file except in compliance
+   with the License.  You may obtain a copy of the License at
+   
+   http://www.apache.org/licenses/LICENSE-2.0
+   
+   Unless required by applicable law or agreed to in writing,
+   software distributed under the License is distributed on an
+   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+   KIND, either express or implied.  See the License for the
+   specific language governing permissions and limitations
+   under the License.
+*/
+package org.apache.derby.impl.drda;
+import java.io.InputStream;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+
+/**
+ * Implementation of InputStream which get EXTDTA from the DDMReader.
+ * This class can be used to stream LOBs from Network client to the
+ * Network server.
+ */
+final class EXTDTAReaderInputStream extends InputStream 
+{
+    /**
+     * Constructor
+     * @param reader The reader to get data from
+     * @exception DRDAProtocolException if thrown while initializing current 
+     *                                  buffer.
+     */
+    EXTDTAReaderInputStream(final DDMReader reader) 
+        throws DRDAProtocolException
+    {
+        super();
+        this.reader = reader;
+        this.length = reader.getDdmLength();        
+        this.remainingBytes = length;
+        this.currentBuffer = 
+            reader.readLOBInitStream(remainingBytes);
+    }
+
+    /**
+     * Reads the next byte of data from the input stream.
+     * 
+     * <p> This subclass of InputStream implements this method by reading
+     * the next byte from the current buffer. If there is more data,
+     * it will be requested a new buffer from the DDMReader.
+     *
+     * @return     the next byte of data, or <code>-1</code> if the end of the
+     *             stream is reached.
+     * @exception  IOException  if an I/O error occurs.
+     * @see        java.io.InputStream#read()
+     */
+    public final int read() 
+        throws IOException
+    {
+        if (remainingBytes <= 0) {
+            return -1;
+        }
+        int val = (currentBuffer == null) ? -1 : currentBuffer.read();
+        if (val < 0) {
+            val = refreshCurrentBuffer();
+        }
+        remainingBytes--;
+        return val;
+    }
+    
+    /**
+     * Reads up to <code>len</code> bytes of data from the input stream into
+     * an array of bytes.  An attempt is made to read as many as
+     * <code>len</code> bytes, but a smaller number may be read, possibly
+     * zero. The number of bytes actually read is returned as an integer.
+     *
+     * This subclass implements this method by calling this method on the 
+     * current buffer, which is an instance of ByteArrayInputStream. If the
+     * current buffer does not have any data, it will be requested a new
+     * buffer from the DDMReader.
+     *
+     * @param      b     the buffer into which the data is read.
+     * @param      off   the start offset in array <code>b</code>
+     *                   at which the data is written.
+     * @param      len   the maximum number of bytes to read.
+     * @return     the total number of bytes read into the buffer, or
+     *             <code>-1</code> if there is no more data because the end of
+     *             the stream has been reached.
+     * @exception  IOException  if an I/O error occurs.
+     * @exception  NullPointerException  if <code>b</code> is <code>null</code>.
+     * @see        java.io.InputStream#read(byte[], int, int)
+     */
+    public final int read(final byte[] b,
+                          final int off,
+                          final int len) 
+        throws IOException
+    {
+        if (remainingBytes <= 0) {
+            return -1;
+        }
+        int val = currentBuffer.read(b, off, len);
+        if (val < 0) {
+            currentBuffer = 
+                reader.readLOBContinuationStream(remainingBytes);
+            val = currentBuffer.read(b, off, len);
+        }
+        remainingBytes -= val;
+        return val;
+    }
+
+    /**
+     * Returns the number of bytes that can be read (or skipped over) from
+     * this input stream without blocking by the next caller of a method for
+     * this input stream.  
+     *
+     * <p> This subclass implements this method by calling available on 
+     *     the current buffer, which is a ByteInputStreamReader.
+     *
+     * @return     the number of bytes that can be read from this input stream
+     *             without blocking.     
+     */
+    public final int available() 
+    {
+        if (remainingBytes <= 0) {
+            return 0;
+        }
+        return currentBuffer.available();
+    }
+
+    /**
+     * Return the length if this stream. The length includes data which has 
+     * been read.
+     * @return length of this stream.
+     */
+    final long getLength() 
+    {
+        return length;
+    }
+    
+    /**
+     * Refresh the current buffer from the DDMReader
+     * @exception IOException if there is a IOException when
+     *                        refreshing the buffer from DDMReader
+     * @return the next byte of data, or <code>-1</code> if the end of the
+     *         stream is reached.
+     */
+    private int refreshCurrentBuffer() 
+        throws IOException
+    {
+        if (remainingBytes > 0) {
+            currentBuffer = 
+                reader.readLOBContinuationStream(remainingBytes);
+            return currentBuffer.read();
+        } else {
+            return -1;
+        }
+    }
+    
+    /** Length of stream */
+    private final long length;
+    
+    /** DDMReader. Used to get more data. */
+    private final DDMReader reader;
+    
+    /** Remaining bytes in stream */
+    private long remainingBytes;
+    
+    /** Current data buffer */
+    private ByteArrayInputStream currentBuffer;
+
+}

Propchange: db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/EXTDTAReaderInputStream.java
------------------------------------------------------------------------------
    svn:eol-style = native