You are viewing a plain text version of this content. The canonical link for it is here.
Posted to derby-commits@db.apache.org by kr...@apache.org on 2009/04/06 17:13:45 UTC

svn commit: r762384 - in /db/derby/code/trunk/java: engine/org/apache/derby/iapi/types/ engine/org/apache/derby/loc/ shared/org/apache/derby/shared/common/reference/ testing/org/apache/derbyTesting/unitTests/junit/

Author: kristwaa
Date: Mon Apr  6 15:13:44 2009
New Revision: 762384

URL: http://svn.apache.org/viewvc?rev=762384&view=rev
Log:
DERBY-4122: java/testing/org/apache/derbyTesting/unitTests/junit/ReaderToUTF8StreamTest.java.
Added mark/reset functionality to ReaderToUTF8Stream.
Made SQLClob use mark/reset to rewind the data stream when too many bytes are
read as part of the stream header parsing. This happens when reading Clobs
written with the pre-10.5 header format, either in soft or hard upgraded
databases.
Added a new error message.
Added unit tests for mark/reset.

Patch file: derby-4122-4c-classcast_fix_mark_reset.diff


Added:
    db/derby/code/trunk/java/testing/org/apache/derbyTesting/unitTests/junit/ReaderToUTF8StreamTest.java   (with props)
Modified:
    db/derby/code/trunk/java/engine/org/apache/derby/iapi/types/ReaderToUTF8Stream.java
    db/derby/code/trunk/java/engine/org/apache/derby/iapi/types/SQLClob.java
    db/derby/code/trunk/java/engine/org/apache/derby/loc/messages.xml
    db/derby/code/trunk/java/shared/org/apache/derby/shared/common/reference/MessageId.java
    db/derby/code/trunk/java/testing/org/apache/derbyTesting/unitTests/junit/_Suite.java

Modified: db/derby/code/trunk/java/engine/org/apache/derby/iapi/types/ReaderToUTF8Stream.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/engine/org/apache/derby/iapi/types/ReaderToUTF8Stream.java?rev=762384&r1=762383&r2=762384&view=diff
==============================================================================
--- db/derby/code/trunk/java/engine/org/apache/derby/iapi/types/ReaderToUTF8Stream.java (original)
+++ db/derby/code/trunk/java/engine/org/apache/derby/iapi/types/ReaderToUTF8Stream.java Mon Apr  6 15:13:44 2009
@@ -30,6 +30,7 @@
 import org.apache.derby.iapi.services.io.DerbyIOException;
 import org.apache.derby.iapi.services.io.LimitReader;
 import org.apache.derby.iapi.services.sanity.SanityManager;
+import org.apache.derby.shared.common.reference.MessageId;
 
 /**
  * Converts the characters served by a {@code java.io.Reader} to a stream
@@ -39,6 +40,7 @@
  * Length validation is performed. If required and allowed by the target column
  * type, truncation of blanks will also be performed.
  */
+//@NotThreadSafe
 public final class ReaderToUTF8Stream
 	extends InputStream
 {
@@ -50,13 +52,22 @@
     /** Constant indicating the first iteration of {@code fillBuffer}. */
     private final static int FIRST_READ = Integer.MIN_VALUE;
     /**
-     * Size of buffer to hold the data read from stream and converted to the
-     * modified UTF-8 format.
+     * Constant indicating that no mark is set in the stream, or that the read
+     * ahead limit of the mark has been exceeded.
      */
-    private final static int BUFSIZE = 32768;
-    private byte[] buffer = new byte[BUFSIZE];
+    private final static int MARK_UNSET_OR_EXCEEDED = -1;
+    /**
+     * Buffer to hold the data read from stream and converted to the modified
+     * UTF-8 format. The initial size is 32 KB, but it may grow if the
+     * {@linkplain #mark(int)} is invoked.
+     */
+    private byte[] buffer = new byte[32*1024];
 	private int boff;
     private int blen = -1;
+    /** Stream mark, set through {@linkplain #mark(int)}. */
+    private int mark = MARK_UNSET_OR_EXCEEDED;
+    /** Read ahead limit for mark, set through {@linkplain #mark(int)}. */
+    private int readAheadLimit;
 	private boolean eof;
     /** Tells if the stream content is/was larger than the buffer size. */
 	private boolean multipleBuffer;
@@ -298,11 +309,42 @@
             // Make startingOffset point at the first byte after the header.
             startingOffset = headerLength;
         }
-		int off = boff = startingOffset;
+		int off = startingOffset;
+        // In the case of a mark, the offset may be adjusted.
+        // Do not change boff in the encoding loop. Before the encoding loop
+        // starts, it shall point at the next byte the stream will deliver on
+        // the next iteration of read or skip.
+        boff = 0;
 
 		if (off == 0)
 			multipleBuffer = true;
 
+        // If we have a mark set, see if we have to expand the buffer, or if we
+        // are going to read past the read ahead limit and can invalidate the
+        // mark and discard the data currently in the buffer.
+        if (mark >= 0) {
+            // Add 6 bytes reserved for one 3 byte character encoding and the
+            // 3 byte Derby EOF marker (see encoding loop further down).
+            int spaceRequired = readAheadLimit + 6;
+            if (mark + spaceRequired > buffer.length) {
+                if (blen != -1) {
+                    // Calculate the new offset, as we may have to shift bytes
+                    // we have already delivered to the left.
+                    boff = off = blen - mark;
+                }
+                byte[] oldBuf = buffer;
+                if (spaceRequired > buffer.length) {
+                    // We have to allocate a bigger buffer to save the bytes.
+                    buffer = new byte[spaceRequired];
+                }
+                System.arraycopy(oldBuf, mark, buffer, 0, off);
+                mark = 0;
+            } else if (blen != -1) {
+                // Invalidate the mark.
+                mark = MARK_UNSET_OR_EXCEEDED;
+            }
+        }
+
 		// 6! need to leave room for a three byte UTF8 encoding
 		// and 3 bytes for our special end of file marker.
 		for (; off <= buffer.length - 6; )
@@ -332,8 +374,6 @@
 		}
 
 		blen = off;
-		boff = 0;
-
 		if (eof)
 			checkSufficientData();
 	}
@@ -487,6 +527,63 @@
        // from the reader object 
        // reader.getLimit() returns the remaining bytes available
        // on this stream
-       return (BUFSIZE > remainingBytes ? remainingBytes : BUFSIZE);
+       return (buffer.length > remainingBytes ? remainingBytes : buffer.length);
+    }
+
+    /**
+     * Marks the current position in the stream.
+     * <p>
+     * Note that this stream is not marked at position zero by default (i.e.
+     * in the constructor).
+     *
+     * @param readAheadLimit the maximum limit of bytes that can be read before
+     *      the mark position becomes invalid
+     */
+    public void mark(int readAheadLimit) {
+        if (readAheadLimit > 0) {
+            this.readAheadLimit = readAheadLimit;
+            mark = boff;
+        } else {
+            this.readAheadLimit = mark = MARK_UNSET_OR_EXCEEDED;
+        }
+    }
+
+    /**
+     * Repositions this stream to the position at the time the mark method was
+     * last called on this input stream.
+     *
+     * @throws EOFException if the stream has been closed
+     * @throws IOException if no mark has been set, or the read ahead limit of
+     *      the mark has been exceeded
+     */
+    public void reset()
+            throws IOException {
+        // Throw execption if the stream has been closed implicitly or
+        // explicitly.
+        if (buffer == null) {
+            throw new EOFException(MessageService.getTextMessage
+                    (SQLState.STREAM_EOF));
+        }
+        // Throw exception if the mark hasn't been set, or if we had to refill
+        // the internal buffer after we had read past the read ahead limit.
+        if (mark == MARK_UNSET_OR_EXCEEDED) {
+            throw new IOException(MessageService.getTextMessage(
+                    MessageId.STREAM_MARK_UNSET_OR_EXCEEDED));
+        }
+        // Reset successful, adjust state.
+        boff = mark;
+        readAheadLimit = mark = MARK_UNSET_OR_EXCEEDED;
+    }
+
+    /**
+     * Tests if this stream supports mark/reset.
+     * <p>
+     * The {@code markSupported} method of {@code ByteArrayInputStream} always
+     * returns {@code true}.
+     *
+     * @return {@code true}, mark/reset is always supported.
+     */
+    public boolean markSupported() {
+        return true;
     }
 }

Modified: db/derby/code/trunk/java/engine/org/apache/derby/iapi/types/SQLClob.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/engine/org/apache/derby/iapi/types/SQLClob.java?rev=762384&r1=762383&r2=762384&view=diff
==============================================================================
--- db/derby/code/trunk/java/engine/org/apache/derby/iapi/types/SQLClob.java (original)
+++ db/derby/code/trunk/java/engine/org/apache/derby/iapi/types/SQLClob.java Mon Apr  6 15:13:44 2009
@@ -32,6 +32,8 @@
 import org.apache.derby.iapi.services.sanity.SanityManager;
 import org.apache.derby.iapi.util.UTF8Util;
 
+import org.apache.derby.shared.common.reference.SQLState;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.ObjectInput;
@@ -331,6 +333,11 @@
                 // Assume new header format, adjust later if necessary.
                 byte[] header = new byte[MAX_STREAM_HEADER_LENGTH];
                 int read = stream.read(header);
+                // Expect at least two header bytes.
+                if (SanityManager.DEBUG) {
+                    SanityManager.ASSERT(read > 1,
+                            "Too few header bytes: " + read);
+                }
                 HeaderInfo hdrInfo = investigateHeader(header, read);
                 if (read > hdrInfo.headerLength()) {
                     // We have read too much. Reset the stream.
@@ -346,6 +353,21 @@
                     byteLength(hdrInfo.byteLength()).
                     charLength(hdrInfo.charLength()).build();
             } catch (IOException ioe) {
+                // Check here to see if the root cause is a container closed
+                // exception. If so, this most likely means that the Clob was
+                // accessed after a commit or rollback on the connection.
+                Throwable rootCause = ioe;
+                while (rootCause.getCause() != null) {
+                    rootCause = rootCause.getCause();
+                }
+                if (rootCause instanceof StandardException) {
+                    StandardException se = (StandardException)rootCause;
+                    if (se.getMessageId().equals(
+                            SQLState.DATA_CONTAINER_CLOSED)) {
+                        throw StandardException.newException(
+                                SQLState.BLOB_ACCESSED_AFTER_COMMIT, ioe);
+                    }
+                }
                 throwStreamingIOException(ioe);
             }
         }
@@ -512,11 +534,19 @@
             long vcl = vc.length();
             if (vcl < 0L || vcl > Integer.MAX_VALUE)
                 throw this.outOfRange();
-
-            ReaderToUTF8Stream utfIn = new ReaderToUTF8Stream(
-                    vc.getCharacterStream(), (int) vcl, 0, TypeId.CLOB_NAME,
-                    getStreamHeaderGenerator());
-            setValue(utfIn, (int) vcl);
+            // For small values, just materialize the value.
+            // NOTE: Using streams for the empty string ("") isn't supported
+            // down this code path when in soft upgrade mode, because the code
+            // reading the header bytes ends up reading zero bytes (i.e., it
+            // doesn't get the header / EOF marker).
+            if (vcl < 32*1024) {
+                setValue(vc.getSubString(1, (int)vcl));
+            } else {
+                ReaderToUTF8Stream utfIn = new ReaderToUTF8Stream(
+                        vc.getCharacterStream(), (int) vcl, 0, TypeId.CLOB_NAME,
+                        getStreamHeaderGenerator());
+                setValue(utfIn, (int) vcl);
+            }
         } catch (SQLException e) {
             throw dataTypeConversion("DAN-438-tmp");
        }
@@ -658,17 +688,42 @@
             // Make sure the stream is correctly positioned.
             rewindStream(hdrLen);
         } else {
+            final boolean markSet = stream.markSupported();
+            if (markSet) {
+                stream.mark(MAX_STREAM_HEADER_LENGTH);
+            }
             byte[] header = new byte[MAX_STREAM_HEADER_LENGTH];
             int read = in.read(header);
+            // Expect at least two header bytes.
+            if (SanityManager.DEBUG) {
+                SanityManager.ASSERT(read > 1, "Too few header bytes: " + read);
+            }
             hdrInfo = investigateHeader(header, read);
             if (read > hdrInfo.headerLength()) {
                 // We read too much data, reset and position on the first byte
                 // of the user data.
-                rewindStream(hdrInfo.headerLength());
+                // First see if we set a mark on the stream and can reset it.
+                // If not, try using the Resetable interface.
+                if (markSet) {
+                    // Stream is not a store Resetable one, use mark/reset
+                    // functionality instead.
+                    stream.reset();
+                    InputStreamUtil.skipFully(stream, hdrInfo.headerLength());
+                } else if (stream instanceof Resetable) {
+                    // We have a store stream.
+                    rewindStream(hdrInfo.headerLength());
+                }
             }
         }
         // The data will be materialized in memory, in a char array.
-        super.readExternal(in, hdrInfo.byteLength(), hdrInfo.charLength());
+        // Subtract the header length from the byte length if there is a byte
+        // encoded in the header, otherwise the decode routine will try to read
+        // too many bytes.
+        int byteLength = 0; // zero is interpreted as unknown / unset
+        if (hdrInfo.byteLength() != 0) {
+            byteLength = hdrInfo.byteLength() - hdrInfo.headerLength();
+        }
+        super.readExternal(in, byteLength, hdrInfo.charLength());
     }
 
     /**
@@ -686,6 +741,10 @@
         int prevPos = in.getPosition();
         byte[] header = new byte[MAX_STREAM_HEADER_LENGTH];
         int read = in.read(header);
+        // Expect at least two header bytes.
+        if (SanityManager.DEBUG) {
+            SanityManager.ASSERT(read > 1, "Too few header bytes: " + read);
+        }
         HeaderInfo hdrInfo = investigateHeader(header, read);
         if (read > hdrInfo.headerLength()) {
             // Reset stream. This path will only be taken for Clobs stored

Modified: db/derby/code/trunk/java/engine/org/apache/derby/loc/messages.xml
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/engine/org/apache/derby/loc/messages.xml?rev=762384&r1=762383&r2=762384&view=diff
==============================================================================
--- db/derby/code/trunk/java/engine/org/apache/derby/loc/messages.xml (original)
+++ db/derby/code/trunk/java/engine/org/apache/derby/loc/messages.xml Mon Apr  6 15:13:44 2009
@@ -7401,6 +7401,11 @@
                 <arg>errorMessage</arg>
             </msg>
 
+            <msg>
+                <name>I027</name>
+                <text>No mark set, or mark read ahead limit exceeded.</text>
+            </msg>
+
         </family>
 
 

Modified: db/derby/code/trunk/java/shared/org/apache/derby/shared/common/reference/MessageId.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/shared/org/apache/derby/shared/common/reference/MessageId.java?rev=762384&r1=762383&r2=762384&view=diff
==============================================================================
--- db/derby/code/trunk/java/shared/org/apache/derby/shared/common/reference/MessageId.java (original)
+++ db/derby/code/trunk/java/shared/org/apache/derby/shared/common/reference/MessageId.java Mon Apr  6 15:13:44 2009
@@ -184,6 +184,11 @@
 	String CORE_DATABASE_NOT_AVAILABLE	= "I024"; // Database not available
 	String CORE_DRIVER_NOT_AVAILABLE	= "I025"; // JDBC Driver not available
 	String JDBC_DRIVER_REGISTER_ERROR 	= "I026"; // Error while registering driver
+    /**
+     * At the time InputStream.reset was invoked, either no mark was set or the
+     * read ahead limit of the mark was exceeded.
+     */
+    String STREAM_MARK_UNSET_OR_EXCEEDED                    = "I027";
 
     /*
      * Monitor

Added: db/derby/code/trunk/java/testing/org/apache/derbyTesting/unitTests/junit/ReaderToUTF8StreamTest.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/testing/org/apache/derbyTesting/unitTests/junit/ReaderToUTF8StreamTest.java?rev=762384&view=auto
==============================================================================
--- db/derby/code/trunk/java/testing/org/apache/derbyTesting/unitTests/junit/ReaderToUTF8StreamTest.java (added)
+++ db/derby/code/trunk/java/testing/org/apache/derbyTesting/unitTests/junit/ReaderToUTF8StreamTest.java Mon Apr  6 15:13:44 2009
@@ -0,0 +1,686 @@
+/*
+
+   Derby - Class org.apache.derbyTesting.unitTests.junit.ReaderToUTF8StreamTest
+
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to you under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+
+ */
+
+package org.apache.derbyTesting.unitTests.junit;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Reader;
+import java.util.Arrays;
+import java.util.Random;
+import junit.framework.Test;
+import junit.framework.TestSuite;
+import org.apache.derby.iapi.services.io.InputStreamUtil;
+import org.apache.derby.iapi.types.ClobStreamHeaderGenerator;
+import org.apache.derby.iapi.types.ReaderToUTF8Stream;
+import org.apache.derbyTesting.functionTests.util.streams.CharAlphabet;
+import org.apache.derbyTesting.functionTests.util.streams.LoopingAlphabetReader;
+import org.apache.derbyTesting.junit.BaseTestCase;
+
+/**
+ * Unit tests for ReaderToUTF8Stream.
+ * <p>
+ * Explicit tests for the mark/reset feature start with "testMark".
+ */
+public class ReaderToUTF8StreamTest
+        extends BaseTestCase {
+
+    /**
+     * The default size of the internal buffer in ReaderToUTF8Stream. Used to
+     * trigger specific events in the reader.
+     */
+    private static int DEFAULT_INTERNAL_BUFFER_SIZE = 32*1024;
+
+    public ReaderToUTF8StreamTest(String name) {
+        super(name);
+    }
+
+    public static Test suite() {
+        return new TestSuite(ReaderToUTF8StreamTest.class);
+    }
+
+    /**
+     * Tests a very basic use of the mark/reset mechanism.
+     *
+     * @throws IOException if something goes wrong
+     */
+    public void testMarkResetSimplePosZero()
+            throws IOException {
+        InputStream is = getStream(100);
+        is.mark(10);
+        assertEquals(10, is.read(new byte[10]));
+        is.reset();
+        checkBeginningOfStream(is);
+    }
+
+    /**
+     * Tests a very basic use of the mark/reset mechanism.
+     *
+     * @throws IOException if something goes wrong
+     */
+    public void testMarkResetSimplePosNonZero()
+            throws IOException {
+        InputStream is = getStream(200);
+        assertEquals(127, is.read(new byte[127]));
+        is.mark(10);
+        byte[] readBeforeReset = new byte[10];
+        byte[] readAfterReset = new byte[10];
+        assertEquals(10, is.read(readBeforeReset));
+        is.reset();
+        assertEquals(10, is.read(readAfterReset));
+        assertTrue(Arrays.equals(readBeforeReset, readAfterReset));
+    }
+
+    /**
+     * Tests that shifting of existing bytes works.
+     *
+     * @throws IOException if something goes wrong
+     */
+    public void testMarkResetShiftBytesFew_Internal()
+            throws IOException {
+        InputStream is = getStream(128*1024);
+        byte[] buf = new byte[DEFAULT_INTERNAL_BUFFER_SIZE - 2*1024];
+        fillArray(is, buf);
+        // The following mark fits within the existing default buffer, but the
+        // bytes after the mark have to be shifted to the left.
+        is.mark(4*1024);
+        byte[] readBeforeReset = new byte[3*1024];
+        byte[] readAfterReset = new byte[3*1024];
+        fillArray(is, readBeforeReset);
+        // Obtain something to compare with.
+        InputStream src = getStream(128*1024);
+        InputStreamUtil.skipFully(src, DEFAULT_INTERNAL_BUFFER_SIZE - 2*1024);
+        byte[] comparisonRead = new byte[3*1024];
+        fillArray(src, comparisonRead);
+        // Compare
+        assertEquals(new ByteArrayInputStream(comparisonRead),
+                     new ByteArrayInputStream(readBeforeReset));
+        // Reset the stream.
+        is.reset();
+        fillArray(is, readAfterReset);
+        assertEquals(new ByteArrayInputStream(readBeforeReset),
+                     new ByteArrayInputStream(readAfterReset));
+    }
+
+    /**
+     * Tests that shifting of existing bytes works.
+     *
+     * @throws IOException if something goes wrong
+     */
+    public void testMarkResetShiftBytesMany_Internal()
+            throws IOException {
+        InputStream is = getStream(128*1024);
+        is.read();
+        is.read();
+        // The following mark fits within the existing default buffer, but the
+        // bytes after the mark have to be shifted to the left.
+        is.mark(DEFAULT_INTERNAL_BUFFER_SIZE -6);
+        byte[] readBeforeReset = new byte[DEFAULT_INTERNAL_BUFFER_SIZE -6];
+        byte[] readAfterReset = new byte[DEFAULT_INTERNAL_BUFFER_SIZE -6];
+        fillArray(is, readBeforeReset);
+        // Obtain something to compare with.
+        InputStream src = getStream(128*1024);
+        src.read();
+        src.read();
+        byte[] comparisonRead = new byte[DEFAULT_INTERNAL_BUFFER_SIZE -6];
+        fillArray(src, comparisonRead);
+        // Compare
+        assertEquals(new ByteArrayInputStream(comparisonRead),
+                     new ByteArrayInputStream(readBeforeReset));
+        // Reset the stream.
+        is.reset();
+        fillArray(is, readAfterReset);
+        assertEquals(new ByteArrayInputStream(readBeforeReset),
+                     new ByteArrayInputStream(readAfterReset));
+    }
+
+    /**
+     * Tests an implementation specific feature of ReaderToUTF8Stream, which is
+     * that the mark isn't invalidated even though we read past the read ahead
+     * limit, given that the internal buffer doesn't have to be refilled.
+     * <p>
+     * <em>WARNING</em>:This implementation specific feature should not be
+     * relied on by the production code! It may change at any time.
+     *
+     * @throws IOException if something goes wrong
+     */
+    public void testMarkResetExceedReadAheadLimitOK_Internal()
+            throws IOException {
+        InputStream is = getStream(4*1024+17);
+        is.mark(10);
+        assertEquals(20, is.read(new byte[20]));
+        // Note the following is implementation dependent.
+        // Since the bytes are already stored in the internal buffer, we won't
+        // fail the reset even though we have exceeded the read ahead limit.
+        // With a different stream implementation, this may fail!
+        is.reset();
+    }
+
+    /**
+     * Tests that the reset-call will fail we exceed the mark ahead limit and
+     * the internal buffer has to be refilled.
+     *
+     * @throws IOException if something goes wrong
+     */
+    public void testMarkResetExceedReadAheadLimitFail_Internal()
+            throws IOException {
+        InputStream is = getStream(64*1024+17);
+        is.mark(10);
+        // The internal buffer is 32 KB (implementation detail).
+        int toRead = 38*1024+7;
+        int read = 0;
+        byte[] buf = new byte[toRead];
+        while (read < toRead) {
+            read += is.read(buf, read, toRead - read);
+        }
+        // Note the following is implementation dependent.
+        try {
+            is.reset();
+            fail("reset-call was expected to throw IOException");
+        } catch (IOException ioe) {
+            // As expected, do nothing
+        }
+    }
+
+    /**
+     * Reads almost enough bytes to read past the read ahead limit, then tests
+     * that the reset works. After that, reads past the read ahead limit and
+     * tests that the reset fails.
+     *
+     * @throws IOException if something goes wrong
+     */
+    public void testMarkResetOverflowInternalBufferKeepBytes()
+            throws IOException {
+        InputStream is = getStream(128*1024);
+        is.mark(120*1024);
+        byte[] buf = new byte[120*1024-1];
+        fillArray(is, buf);
+        is.reset();
+        checkBeginningOfStream(is);
+
+        // Again, but this time read past the read ahead limit.
+        is = getStream(36*1024);
+        is.mark(4*1024);
+        buf = new byte[36*1024-1];
+        fillArray(is, buf);
+        try {
+            is.reset();
+            fail("reset-call was expected to throw IOException");
+        }  catch (IOException ioe) {
+            // As expected, do nothing
+        }
+    }
+
+    /**
+     * Marks the stream with a read ahead limit larger than the stream itself,
+     * then reads until the end of the stream.
+     * <p>
+     * The current implementation does not allow the stream to be reset after
+     * the last byte in the stream has been read once.
+     *
+     * @throws IOException if something goes wrong
+     */
+    public void testMarkReadUntilEOF()
+            throws IOException {
+        // Try with a single buffer fill first.
+        InputStream is = getStream(4*1024);
+        is.mark(8*1024);
+        byte[] buf = new byte[8*1024];
+        int read = 0;
+        while (true) {
+            int readNow = is.read(buf, read, buf.length - read);
+            if (readNow == -1) {
+                break;
+            }
+            read += readNow;
+        }
+        try {
+            is.reset();
+            fail("reset-call was expected to throw IOException");
+        } catch (IOException ioe) {
+            // The current implementation does not allow resetting the stream
+            // when the source stream itself has been drained and all the data
+            // has been read once.
+        }
+
+        // Now try with multiple buffer fills.
+        is = getStream(640*1024);
+        is.mark(128*1024);
+        buf = new byte[8*1024];
+        while (true) {
+            // Just drain the stream.
+            if (is.read(buf, 0, buf.length) == -1) {
+                break;
+            }
+        }
+        try {
+            is.reset();
+            fail("reset-call was expected to throw IOException");
+        } catch (IOException ioe) {
+            // The current implementation does not allow resetting the stream
+            // when the source stream itself has been drained and all the data
+            // has been read once.
+        }
+    }
+
+    /**
+     * Marks the stream with a read ahead limit larger than the stream itself,
+     * then reads until just before the end of the stream.
+     *
+     * @throws IOException if something goes wrong
+     */
+    public void testMarkReadAlmostUntilEOF()
+            throws IOException {
+        // Try with a single buffer fill first.
+        int limit = 4*1024;
+        InputStream is = getStream(limit);
+        is.mark(8*1024);
+        byte[] buf = new byte[limit*2];
+        int read = 0;
+        while (read < limit -1) {
+            int readNow = is.read(buf, read, (limit -1) - read);
+            if (readNow == -1) {
+                break;
+            }
+            read += readNow;
+        }
+        // EOF has been reached when filling the internal buffer, but we still
+        // havent't read it. Therefore, the reset should succeed.
+        is.reset();
+        checkBeginningOfStream(is);
+    }
+
+    /**
+     * Makes sure that the header bytes are copied when creating a new buffer
+     * to hold all the required bytes when the stream has been marked.
+     * This will only happen the first time the buffer is filled, i.e. when the
+     * stream is marked before the first read (mark at position zero).
+     *
+     * @throws IOException if something goes wrong
+     */
+    public void testHeaderPresentInStream_Internal()
+            throws IOException {
+        final int valueLen = DEFAULT_INTERNAL_BUFFER_SIZE + 5*1024;
+        InputStream is = getStream(valueLen);
+        is.mark(valueLen - 1024);
+        // Obtain a header generator to compare with.
+        ClobStreamHeaderGenerator hdrGen = new ClobStreamHeaderGenerator(false);
+        byte[] hdrTmp = new byte[100];
+        int headerLen = hdrGen.generateInto(hdrTmp, 0, valueLen);
+        byte[] hdr1 = new byte[headerLen];
+        System.arraycopy(hdrTmp, 0, hdr1, 0, headerLen);
+        byte[] hdr2 = new byte[headerLen];
+        // Get the first bytes from the stream being tested.
+        assertEquals(headerLen, is.read(hdr2));
+        assertEquals(new ByteArrayInputStream(hdr1),
+                     new ByteArrayInputStream(hdr2));
+    }
+
+    /**
+     * Returns a stream to test, loaded with the repeating modern latin
+     * lowercase alphabet.
+     *
+     * @param length the length of the stream in characters
+     * @return A stream serving bytes.
+     */
+    private InputStream getStream(int length) {
+        Reader src = new LoopingAlphabetReader(length,
+                                        CharAlphabet.modernLatinLowercase());
+        InputStream is = new ReaderToUTF8Stream(
+                src, length, 0, "CLOB", new ClobStreamHeaderGenerator(false));
+        assertTrue("The stream doesn't support mark/reset", is.markSupported());
+        return is;
+    }
+
+    /**
+     * Checks the beginning of the stream, which is expected to consist of five
+     * header bytes (skipped) followed by the bytes for the characters 'a' and
+     * 'b'.
+     *
+     * @param is the stream to check
+     * @throws IOException if reading from the stream fails
+     * @throws AssertionFailedError if the stream content isn't as expected
+     */
+    private void checkBeginningOfStream(InputStream is)
+            throws IOException {
+        assertEquals(5, is.skip(5));
+        // We should now get the character a, followed by b.
+        assertEquals((byte)'a', is.read());
+        assertEquals((byte)'b', is.read());
+    }
+
+    /**
+     * Fills the array by reading from the stream.
+     *
+     * @param is input stream to read from
+     * @param b array to fill with bytes from the stream
+     * @throws IOException if reading from the array fails, or the end of the
+     *      stream is reached
+     */
+    private void fillArray(InputStream is, byte[] b)
+            throws IOException {
+        final int toRead = b.length;
+        int read = 0;
+        while (read < toRead) {
+            int readNow = is.read(b, read, toRead - read);
+            assertTrue("reached EOF", readNow != -1);
+            read += readNow;
+        }
+    }
+
+    /**
+     * Performs a series of random operations on a {@code ReaderToUTF8Stream},
+     * consisting of read, skip, mark, reset and a noop.
+     * <p>
+     * <em>Note</em>: Turn on debugging (derby.tests.debug=true) to see some
+     * information, turn on tracing (derby.tests.trace=true) in addition to see
+     * a lot more information.
+     * <p>
+     * If the test fails, the seed will be reported in the error message, and
+     * the load that failed can be rerun.
+     *
+     * @throws IOException if the test fails
+     */
+    public void testRandomSequence()
+            throws IOException {
+        final long seed = System.currentTimeMillis();
+        try {
+            testRandomSequence(seed);
+        } catch (IOException ioe) {
+            // Report the seed for repeatability.
+            IOException wrapper = new IOException("seed=" + seed);
+            wrapper.initCause(ioe);
+            throw wrapper;
+        }
+    }
+
+    /**
+     * Performs a series of random operations on a {@code ReaderToUTF8Stream},
+     * consisting of read, skip, mark, reset and a noop.
+     * <p>
+     * Note that this test verifies that executing the operations don't fail,
+     * but it doesn't verify that the bytes obtained from the stream are the
+     * correct ones.
+     *
+     * @param seed seed controlling the test load
+     * @throws IOException if the test fails
+     */
+    private void testRandomSequence(long seed)
+            throws IOException {
+        println("testRandomSequence seed: " + seed);
+        final int iterations = 100;
+        final Random rng = new Random(seed);
+        for (int i=0; i < iterations; i++) {
+            // Operation counters.
+            int reads = 0, skips = 0, resets = 0, marks = 0, invalidations = 0;
+            // Stream length (up to ~1 MB).
+            int length = 1024*rng.nextInt(1024) + rng.nextInt(1024);
+            boolean rs = rng.nextBoolean();
+            println(">>> iteration " + i + ", length=" + length);
+            int currentPos = 0;
+            int limit = 0;
+            int mark = -1;
+            InputStream is = getStream(length);
+            int ops = 0;
+            while (ops < 200 && currentPos < length - 10) {
+                if (rng.nextBoolean()) { // Whether to read/skip or mark/reset.
+                    int toRead = getRandomLength(currentPos, length, rng, rs);
+                    if (rng.nextBoolean()) {
+                        // Read
+                        mytrace("\treading " + toRead + " bytes");
+                        reads++;
+                        is.read(new byte[toRead]);
+                    } else {
+                        // Skip
+                        mytrace("\tskipping " + toRead + " bytes");
+                        skips++;
+                        is.skip(toRead);
+                    }
+                    currentPos += toRead;
+                    if (mark != -1 && (currentPos - mark) > limit) {
+                        mytrace("\t\tmark invalidated");
+                        invalidations++;
+                        mark = -1;
+                        limit = 0;
+                    }
+                }
+                if (rng.nextBoolean()) { // Whether to read/skip or mark/reset.
+                    // Mark/reset, or do nothing.
+                    if (rng.nextBoolean()) {
+                        if (rng.nextInt(100) < 40 && mark != -1) {
+                            // Reset
+                            mytrace("\tresetting to position " + mark);
+                            resets++;
+                            is.reset();
+                            currentPos = mark;
+                            mark = -1;
+                        } else {
+                            // Mark
+                            limit = getRandomLength(currentPos, length, rng);
+                            mytrace("\tmarking position " + currentPos +
+                                    " with limit " + limit);
+                            marks++;
+                            mark = currentPos;
+                            is.mark(limit);
+                        }
+                    }
+                }
+                ops++;
+            }
+            println("ops=" + ops + ", reads=" + reads + ", skips=" + skips +
+                    ", marks=" + marks + ", resets=" + resets +
+                    ", invalidations=" + invalidations);
+        }
+    }
+
+
+    /**
+     * Returns a random length within the limits.
+     * <p>
+     * This call will operate in the full range of the remaining bytes.
+     *
+     * @param currentPos the current position of the stream
+     * @param length the length of the stream
+     * @param rng random generator
+     * @return A random length within the limits of the stream.
+     */
+    private int getRandomLength(int currentPos, int length, Random rng) {
+        return getRandomLength(currentPos, length, rng, false);
+    }
+
+    /**
+     * Returns a random length within the limits.
+     *
+     * @param currentPos the current position of the stream
+     * @param length the length of the stream
+     * @param rng random generator
+     * @param reducedSize whether to return smaller number or not
+     *      (setting to true may increase the number of operations that will be
+     *      performed on a stream before it is exhausted)
+     * @return A random length within the limits of the stream.
+     */
+    private int getRandomLength(int currentPos, int length, Random rng, boolean reducedSize) {
+        int max = length - currentPos;
+        if (reducedSize) {
+            max = max / 5;
+        }
+        return (1 + (int)(max * rng.nextFloat()));
+    }
+
+    /**
+     * Trace only if both trace and verbose is true in the test configuration.
+     *
+     * @param str the string to print
+     */
+    private void mytrace(String str) {
+        if (getTestConfiguration().isVerbose()) {
+            traceit(str);
+        }
+    }
+
+    /**
+     * Tests mark/reset functionality by comparing with
+     * {@code ByteArrayInputStream}.
+     *
+     * @throws IOException if the test fails
+     */
+    public void testMarkReset1()
+            throws IOException {
+        InputStream is = getStream(64*1024);
+        byte[] srcBuf = new byte[64*1024+5];
+        fillArray(is, srcBuf);
+        InputStream src = new ByteArrayInputStream(srcBuf);
+        // Reinitialize the stream.
+        is = getStream(64*1024);
+
+        StreamUtil su = new StreamUtil(is, src);
+        su.mark(1024);
+        su.skip(17);
+        su.reset();
+        su.read(1);
+        su.read(2133);
+        su.mark(1024);
+        su.reset();
+        su.mark(1024);
+        su.skip(18);
+        su.read(1024);
+    }
+
+    /**
+     * Tests mark/reset functionality by comparing with
+     * {@code ByteArrayInputStream}. This test relies on knowing the size of
+     * the internal buffer to force a shifting of existing bytes to take place.
+     *
+     * @throws IOException if the test fails
+     */
+    public void testMarkReset2_Internal()
+            throws IOException {
+        InputStream is = getStream(128*1024);
+        byte[] srcBuf = new byte[128*1024+5];
+        fillArray(is, srcBuf);
+        InputStream src = new ByteArrayInputStream(srcBuf);
+        // Reinitialize the stream.
+        is = getStream(128*1024);
+
+        StreamUtil su = new StreamUtil(is, src);
+        su.skip(DEFAULT_INTERNAL_BUFFER_SIZE);
+        su.mark(DEFAULT_INTERNAL_BUFFER_SIZE + 2*1024);
+        su.read(1024);
+        su.reset();
+        su.read(3*1024);
+    }
+
+    /**
+     * Utility class executing a few selected method calls on two streams,
+     * expecting both of them to behave in the same way.
+     */
+    private class StreamUtil {
+        private final InputStream is1;
+        private final InputStream is2;
+
+        StreamUtil(InputStream is1, InputStream is2) {
+            assertNotNull(is1);
+            assertNotNull(is2);
+            this.is1 = is1;
+            this.is2 = is2;
+        }
+
+        public void mark(int readAheadLimit) {
+            is1.mark(readAheadLimit);
+            is2.mark(readAheadLimit);
+        }
+
+        public void reset()
+                throws IOException {
+            is1.reset();
+            is2.reset();
+        }
+
+        public long skip(long skip)
+                throws IOException {
+            long skip1 = 0;
+            long skip2 = 0;
+            // Skip data in the first stream.
+            while (skip1 < skip) {
+                long skippedNow = is1.skip(skip - skip1);
+                if (skippedNow == -1) {
+                    fail("stream one reached EOF: " + is1.getClass());
+                }
+                skip1 += skippedNow;
+            }
+            // Skip data in the second stream.
+            while (skip2 < skip) {
+                long skippedNow = is2.skip(skip - skip2);
+                if (skippedNow == -1) {
+                    fail("stream two reached EOF: " + is2.getClass());
+                }
+                skip2 += skippedNow;
+            }
+            assertEquals(skip1, skip2);
+            return skip1;
+        }
+
+        public int read(int toRead)
+                throws IOException {
+            byte[] b1 = new byte[toRead];
+            byte[] b2 = new byte[toRead];
+            int read = read(b1, b2, false);
+            assertEquals(new ByteArrayInputStream(b1),
+                         new ByteArrayInputStream(b2));
+            return read;
+        }
+
+        public int read(byte[] b1, byte[] b2, boolean expectEOF)
+                throws IOException {
+            assertEquals("unequal sized arrays", b1.length, b2.length);
+            int read1 = 0;
+            int read2 = 0;
+            final int toRead = b1.length;
+            // Read from the first stream.
+            while (read1 < toRead) {
+                int readNow = is1.read(b1, read1, toRead - read1);
+                if (readNow == -1) {
+                    if (expectEOF) {
+                        break;
+                    } else {
+                        fail("stream one reached EOF: " + is1.getClass());
+                    }
+                }
+                read1 += readNow;
+            }
+            // Read from the second stream.
+            while (read2 < toRead) {
+                int readNow = is2.read(b2, read2, toRead - read2);
+                if (readNow == -1) {
+                    if (expectEOF) {
+                        break;
+                    } else {
+                        fail("stream two reached EOF: " + is2.getClass());
+                    }
+                }
+                read2 += readNow;
+            }
+            assertEquals(read1, read2);
+            return read1;
+        }
+    }
+}

Propchange: db/derby/code/trunk/java/testing/org/apache/derbyTesting/unitTests/junit/ReaderToUTF8StreamTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: db/derby/code/trunk/java/testing/org/apache/derbyTesting/unitTests/junit/_Suite.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/testing/org/apache/derbyTesting/unitTests/junit/_Suite.java?rev=762384&r1=762383&r2=762384&view=diff
==============================================================================
--- db/derby/code/trunk/java/testing/org/apache/derbyTesting/unitTests/junit/_Suite.java (original)
+++ db/derby/code/trunk/java/testing/org/apache/derbyTesting/unitTests/junit/_Suite.java Mon Apr  6 15:13:44 2009
@@ -57,6 +57,7 @@
         suite.addTest(BlockedByteArrayTest.suite());
         suite.addTest(PathUtilTest.suite());
         suite.addTest(VirtualFileTest.suite());
+        suite.addTest(ReaderToUTF8StreamTest.suite());
 
         return suite;
     }