You are viewing a plain text version of this content. The canonical link for it is here.
Posted to derby-commits@db.apache.org by kr...@apache.org on 2009/04/06 17:13:45 UTC
svn commit: r762384 - in /db/derby/code/trunk/java:
engine/org/apache/derby/iapi/types/ engine/org/apache/derby/loc/
shared/org/apache/derby/shared/common/reference/
testing/org/apache/derbyTesting/unitTests/junit/
Author: kristwaa
Date: Mon Apr 6 15:13:44 2009
New Revision: 762384
URL: http://svn.apache.org/viewvc?rev=762384&view=rev
Log:
DERBY-4122: java/testing/org/apache/derbyTesting/unitTests/junit/ReaderToUTF8StreamTest.java.
Added mark/reset functionality to ReaderToUTF8Stream.
Made SQLClob use mark/reset to rewind the data stream when too many bytes are
read as part of the stream header parsing. This happens when reading Clobs
written with the pre-10.5 header format, either in soft or hard upgraded
databases.
Added a new error message.
Added unit tests for mark/reset.
Patch file: derby-4122-4c-classcast_fix_mark_reset.diff
Added:
db/derby/code/trunk/java/testing/org/apache/derbyTesting/unitTests/junit/ReaderToUTF8StreamTest.java (with props)
Modified:
db/derby/code/trunk/java/engine/org/apache/derby/iapi/types/ReaderToUTF8Stream.java
db/derby/code/trunk/java/engine/org/apache/derby/iapi/types/SQLClob.java
db/derby/code/trunk/java/engine/org/apache/derby/loc/messages.xml
db/derby/code/trunk/java/shared/org/apache/derby/shared/common/reference/MessageId.java
db/derby/code/trunk/java/testing/org/apache/derbyTesting/unitTests/junit/_Suite.java
Modified: db/derby/code/trunk/java/engine/org/apache/derby/iapi/types/ReaderToUTF8Stream.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/engine/org/apache/derby/iapi/types/ReaderToUTF8Stream.java?rev=762384&r1=762383&r2=762384&view=diff
==============================================================================
--- db/derby/code/trunk/java/engine/org/apache/derby/iapi/types/ReaderToUTF8Stream.java (original)
+++ db/derby/code/trunk/java/engine/org/apache/derby/iapi/types/ReaderToUTF8Stream.java Mon Apr 6 15:13:44 2009
@@ -30,6 +30,7 @@
import org.apache.derby.iapi.services.io.DerbyIOException;
import org.apache.derby.iapi.services.io.LimitReader;
import org.apache.derby.iapi.services.sanity.SanityManager;
+import org.apache.derby.shared.common.reference.MessageId;
/**
* Converts the characters served by a {@code java.io.Reader} to a stream
@@ -39,6 +40,7 @@
* Length validation is performed. If required and allowed by the target column
* type, truncation of blanks will also be performed.
*/
+//@NotThreadSafe
public final class ReaderToUTF8Stream
extends InputStream
{
@@ -50,13 +52,22 @@
/** Constant indicating the first iteration of {@code fillBuffer}. */
private final static int FIRST_READ = Integer.MIN_VALUE;
/**
- * Size of buffer to hold the data read from stream and converted to the
- * modified UTF-8 format.
+ * Constant indicating that no mark is set in the stream, or that the read
+ * ahead limit of the mark has been exceeded.
*/
- private final static int BUFSIZE = 32768;
- private byte[] buffer = new byte[BUFSIZE];
+ private final static int MARK_UNSET_OR_EXCEEDED = -1;
+ /**
+ * Buffer to hold the data read from stream and converted to the modified
+ * UTF-8 format. The initial size is 32 KB, but it may grow if the
+ * {@linkplain #mark(int)} is invoked.
+ */
+ private byte[] buffer = new byte[32*1024];
private int boff;
private int blen = -1;
+ /** Stream mark, set through {@linkplain #mark(int)}. */
+ private int mark = MARK_UNSET_OR_EXCEEDED;
+ /** Read ahead limit for mark, set through {@linkplain #mark(int)}. */
+ private int readAheadLimit;
private boolean eof;
/** Tells if the stream content is/was larger than the buffer size. */
private boolean multipleBuffer;
@@ -298,11 +309,42 @@
// Make startingOffset point at the first byte after the header.
startingOffset = headerLength;
}
- int off = boff = startingOffset;
+ int off = startingOffset;
+ // In the case of a mark, the offset may be adjusted.
+ // Do not change boff in the encoding loop. Before the encoding loop
+ // starts, it shall point at the next byte the stream will deliver on
+ // the next iteration of read or skip.
+ boff = 0;
if (off == 0)
multipleBuffer = true;
+ // If we have a mark set, see if we have to expand the buffer, or if we
+ // are going to read past the read ahead limit and can invalidate the
+ // mark and discard the data currently in the buffer.
+ if (mark >= 0) {
+ // Add 6 bytes reserved for one 3 byte character encoding and the
+ // 3 byte Derby EOF marker (see encoding loop further down).
+ int spaceRequired = readAheadLimit + 6;
+ if (mark + spaceRequired > buffer.length) {
+ if (blen != -1) {
+ // Calculate the new offset, as we may have to shift bytes
+ // we have already delivered to the left.
+ boff = off = blen - mark;
+ }
+ byte[] oldBuf = buffer;
+ if (spaceRequired > buffer.length) {
+ // We have to allocate a bigger buffer to save the bytes.
+ buffer = new byte[spaceRequired];
+ }
+ System.arraycopy(oldBuf, mark, buffer, 0, off);
+ mark = 0;
+ } else if (blen != -1) {
+ // Invalidate the mark.
+ mark = MARK_UNSET_OR_EXCEEDED;
+ }
+ }
+
// 6! need to leave room for a three byte UTF8 encoding
// and 3 bytes for our special end of file marker.
for (; off <= buffer.length - 6; )
@@ -332,8 +374,6 @@
}
blen = off;
- boff = 0;
-
if (eof)
checkSufficientData();
}
@@ -487,6 +527,63 @@
// from the reader object
// reader.getLimit() returns the remaining bytes available
// on this stream
- return (BUFSIZE > remainingBytes ? remainingBytes : BUFSIZE);
+ return (buffer.length > remainingBytes ? remainingBytes : buffer.length);
+ }
+
+ /**
+ * Marks the current position in the stream.
+ * <p>
+ * Note that this stream is not marked at position zero by default (i.e.
+ * in the constructor).
+ *
+ * @param readAheadLimit the maximum limit of bytes that can be read before
+ * the mark position becomes invalid
+ */
+ public void mark(int readAheadLimit) {
+ if (readAheadLimit > 0) {
+ this.readAheadLimit = readAheadLimit;
+ mark = boff;
+ } else {
+ this.readAheadLimit = mark = MARK_UNSET_OR_EXCEEDED;
+ }
+ }
+
+ /**
+ * Repositions this stream to the position at the time the mark method was
+ * last called on this input stream.
+ *
+ * @throws EOFException if the stream has been closed
+ * @throws IOException if no mark has been set, or the read ahead limit of
+ * the mark has been exceeded
+ */
+ public void reset()
+ throws IOException {
+ // Throw execption if the stream has been closed implicitly or
+ // explicitly.
+ if (buffer == null) {
+ throw new EOFException(MessageService.getTextMessage
+ (SQLState.STREAM_EOF));
+ }
+ // Throw exception if the mark hasn't been set, or if we had to refill
+ // the internal buffer after we had read past the read ahead limit.
+ if (mark == MARK_UNSET_OR_EXCEEDED) {
+ throw new IOException(MessageService.getTextMessage(
+ MessageId.STREAM_MARK_UNSET_OR_EXCEEDED));
+ }
+ // Reset successful, adjust state.
+ boff = mark;
+ readAheadLimit = mark = MARK_UNSET_OR_EXCEEDED;
+ }
+
+ /**
+ * Tests if this stream supports mark/reset.
+ * <p>
+ * The {@code markSupported} method of {@code ByteArrayInputStream} always
+ * returns {@code true}.
+ *
+ * @return {@code true}, mark/reset is always supported.
+ */
+ public boolean markSupported() {
+ return true;
}
}
Modified: db/derby/code/trunk/java/engine/org/apache/derby/iapi/types/SQLClob.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/engine/org/apache/derby/iapi/types/SQLClob.java?rev=762384&r1=762383&r2=762384&view=diff
==============================================================================
--- db/derby/code/trunk/java/engine/org/apache/derby/iapi/types/SQLClob.java (original)
+++ db/derby/code/trunk/java/engine/org/apache/derby/iapi/types/SQLClob.java Mon Apr 6 15:13:44 2009
@@ -32,6 +32,8 @@
import org.apache.derby.iapi.services.sanity.SanityManager;
import org.apache.derby.iapi.util.UTF8Util;
+import org.apache.derby.shared.common.reference.SQLState;
+
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInput;
@@ -331,6 +333,11 @@
// Assume new header format, adjust later if necessary.
byte[] header = new byte[MAX_STREAM_HEADER_LENGTH];
int read = stream.read(header);
+ // Expect at least two header bytes.
+ if (SanityManager.DEBUG) {
+ SanityManager.ASSERT(read > 1,
+ "Too few header bytes: " + read);
+ }
HeaderInfo hdrInfo = investigateHeader(header, read);
if (read > hdrInfo.headerLength()) {
// We have read too much. Reset the stream.
@@ -346,6 +353,21 @@
byteLength(hdrInfo.byteLength()).
charLength(hdrInfo.charLength()).build();
} catch (IOException ioe) {
+ // Check here to see if the root cause is a container closed
+ // exception. If so, this most likely means that the Clob was
+ // accessed after a commit or rollback on the connection.
+ Throwable rootCause = ioe;
+ while (rootCause.getCause() != null) {
+ rootCause = rootCause.getCause();
+ }
+ if (rootCause instanceof StandardException) {
+ StandardException se = (StandardException)rootCause;
+ if (se.getMessageId().equals(
+ SQLState.DATA_CONTAINER_CLOSED)) {
+ throw StandardException.newException(
+ SQLState.BLOB_ACCESSED_AFTER_COMMIT, ioe);
+ }
+ }
throwStreamingIOException(ioe);
}
}
@@ -512,11 +534,19 @@
long vcl = vc.length();
if (vcl < 0L || vcl > Integer.MAX_VALUE)
throw this.outOfRange();
-
- ReaderToUTF8Stream utfIn = new ReaderToUTF8Stream(
- vc.getCharacterStream(), (int) vcl, 0, TypeId.CLOB_NAME,
- getStreamHeaderGenerator());
- setValue(utfIn, (int) vcl);
+ // For small values, just materialize the value.
+ // NOTE: Using streams for the empty string ("") isn't supported
+ // down this code path when in soft upgrade mode, because the code
+ // reading the header bytes ends up reading zero bytes (i.e., it
+ // doesn't get the header / EOF marker).
+ if (vcl < 32*1024) {
+ setValue(vc.getSubString(1, (int)vcl));
+ } else {
+ ReaderToUTF8Stream utfIn = new ReaderToUTF8Stream(
+ vc.getCharacterStream(), (int) vcl, 0, TypeId.CLOB_NAME,
+ getStreamHeaderGenerator());
+ setValue(utfIn, (int) vcl);
+ }
} catch (SQLException e) {
throw dataTypeConversion("DAN-438-tmp");
}
@@ -658,17 +688,42 @@
// Make sure the stream is correctly positioned.
rewindStream(hdrLen);
} else {
+ final boolean markSet = stream.markSupported();
+ if (markSet) {
+ stream.mark(MAX_STREAM_HEADER_LENGTH);
+ }
byte[] header = new byte[MAX_STREAM_HEADER_LENGTH];
int read = in.read(header);
+ // Expect at least two header bytes.
+ if (SanityManager.DEBUG) {
+ SanityManager.ASSERT(read > 1, "Too few header bytes: " + read);
+ }
hdrInfo = investigateHeader(header, read);
if (read > hdrInfo.headerLength()) {
// We read too much data, reset and position on the first byte
// of the user data.
- rewindStream(hdrInfo.headerLength());
+ // First see if we set a mark on the stream and can reset it.
+ // If not, try using the Resetable interface.
+ if (markSet) {
+ // Stream is not a store Resetable one, use mark/reset
+ // functionality instead.
+ stream.reset();
+ InputStreamUtil.skipFully(stream, hdrInfo.headerLength());
+ } else if (stream instanceof Resetable) {
+ // We have a store stream.
+ rewindStream(hdrInfo.headerLength());
+ }
}
}
// The data will be materialized in memory, in a char array.
- super.readExternal(in, hdrInfo.byteLength(), hdrInfo.charLength());
+ // Subtract the header length from the byte length if there is a byte
+ // encoded in the header, otherwise the decode routine will try to read
+ // too many bytes.
+ int byteLength = 0; // zero is interpreted as unknown / unset
+ if (hdrInfo.byteLength() != 0) {
+ byteLength = hdrInfo.byteLength() - hdrInfo.headerLength();
+ }
+ super.readExternal(in, byteLength, hdrInfo.charLength());
}
/**
@@ -686,6 +741,10 @@
int prevPos = in.getPosition();
byte[] header = new byte[MAX_STREAM_HEADER_LENGTH];
int read = in.read(header);
+ // Expect at least two header bytes.
+ if (SanityManager.DEBUG) {
+ SanityManager.ASSERT(read > 1, "Too few header bytes: " + read);
+ }
HeaderInfo hdrInfo = investigateHeader(header, read);
if (read > hdrInfo.headerLength()) {
// Reset stream. This path will only be taken for Clobs stored
Modified: db/derby/code/trunk/java/engine/org/apache/derby/loc/messages.xml
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/engine/org/apache/derby/loc/messages.xml?rev=762384&r1=762383&r2=762384&view=diff
==============================================================================
--- db/derby/code/trunk/java/engine/org/apache/derby/loc/messages.xml (original)
+++ db/derby/code/trunk/java/engine/org/apache/derby/loc/messages.xml Mon Apr 6 15:13:44 2009
@@ -7401,6 +7401,11 @@
<arg>errorMessage</arg>
</msg>
+ <msg>
+ <name>I027</name>
+ <text>No mark set, or mark read ahead limit exceeded.</text>
+ </msg>
+
</family>
Modified: db/derby/code/trunk/java/shared/org/apache/derby/shared/common/reference/MessageId.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/shared/org/apache/derby/shared/common/reference/MessageId.java?rev=762384&r1=762383&r2=762384&view=diff
==============================================================================
--- db/derby/code/trunk/java/shared/org/apache/derby/shared/common/reference/MessageId.java (original)
+++ db/derby/code/trunk/java/shared/org/apache/derby/shared/common/reference/MessageId.java Mon Apr 6 15:13:44 2009
@@ -184,6 +184,11 @@
String CORE_DATABASE_NOT_AVAILABLE = "I024"; // Database not available
String CORE_DRIVER_NOT_AVAILABLE = "I025"; // JDBC Driver not available
String JDBC_DRIVER_REGISTER_ERROR = "I026"; // Error while registering driver
+ /**
+ * At the time InputStream.reset was invoked, either no mark was set or the
+ * read ahead limit of the mark was exceeded.
+ */
+ String STREAM_MARK_UNSET_OR_EXCEEDED = "I027";
/*
* Monitor
Added: db/derby/code/trunk/java/testing/org/apache/derbyTesting/unitTests/junit/ReaderToUTF8StreamTest.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/testing/org/apache/derbyTesting/unitTests/junit/ReaderToUTF8StreamTest.java?rev=762384&view=auto
==============================================================================
--- db/derby/code/trunk/java/testing/org/apache/derbyTesting/unitTests/junit/ReaderToUTF8StreamTest.java (added)
+++ db/derby/code/trunk/java/testing/org/apache/derbyTesting/unitTests/junit/ReaderToUTF8StreamTest.java Mon Apr 6 15:13:44 2009
@@ -0,0 +1,686 @@
+/*
+
+ Derby - Class org.apache.derbyTesting.unitTests.junit.ReaderToUTF8StreamTest
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to you under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+ */
+
+package org.apache.derbyTesting.unitTests.junit;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Reader;
+import java.util.Arrays;
+import java.util.Random;
+import junit.framework.Test;
+import junit.framework.TestSuite;
+import org.apache.derby.iapi.services.io.InputStreamUtil;
+import org.apache.derby.iapi.types.ClobStreamHeaderGenerator;
+import org.apache.derby.iapi.types.ReaderToUTF8Stream;
+import org.apache.derbyTesting.functionTests.util.streams.CharAlphabet;
+import org.apache.derbyTesting.functionTests.util.streams.LoopingAlphabetReader;
+import org.apache.derbyTesting.junit.BaseTestCase;
+
+/**
+ * Unit tests for ReaderToUTF8Stream.
+ * <p>
+ * Explicit tests for the mark/reset feature start with "testMark".
+ */
+public class ReaderToUTF8StreamTest
+ extends BaseTestCase {
+
+ /**
+ * The default size of the internal buffer in ReaderToUTF8Stream. Used to
+ * trigger specific events in the reader.
+ */
+ private static int DEFAULT_INTERNAL_BUFFER_SIZE = 32*1024;
+
+ public ReaderToUTF8StreamTest(String name) {
+ super(name);
+ }
+
+ public static Test suite() {
+ return new TestSuite(ReaderToUTF8StreamTest.class);
+ }
+
+ /**
+ * Tests a very basic use of the mark/reset mechanism.
+ *
+ * @throws IOException if something goes wrong
+ */
+ public void testMarkResetSimplePosZero()
+ throws IOException {
+ InputStream is = getStream(100);
+ is.mark(10);
+ assertEquals(10, is.read(new byte[10]));
+ is.reset();
+ checkBeginningOfStream(is);
+ }
+
+ /**
+ * Tests a very basic use of the mark/reset mechanism.
+ *
+ * @throws IOException if something goes wrong
+ */
+ public void testMarkResetSimplePosNonZero()
+ throws IOException {
+ InputStream is = getStream(200);
+ assertEquals(127, is.read(new byte[127]));
+ is.mark(10);
+ byte[] readBeforeReset = new byte[10];
+ byte[] readAfterReset = new byte[10];
+ assertEquals(10, is.read(readBeforeReset));
+ is.reset();
+ assertEquals(10, is.read(readAfterReset));
+ assertTrue(Arrays.equals(readBeforeReset, readAfterReset));
+ }
+
+ /**
+ * Tests that shifting of existing bytes works.
+ *
+ * @throws IOException if something goes wrong
+ */
+ public void testMarkResetShiftBytesFew_Internal()
+ throws IOException {
+ InputStream is = getStream(128*1024);
+ byte[] buf = new byte[DEFAULT_INTERNAL_BUFFER_SIZE - 2*1024];
+ fillArray(is, buf);
+ // The following mark fits within the existing default buffer, but the
+ // bytes after the mark have to be shifted to the left.
+ is.mark(4*1024);
+ byte[] readBeforeReset = new byte[3*1024];
+ byte[] readAfterReset = new byte[3*1024];
+ fillArray(is, readBeforeReset);
+ // Obtain something to compare with.
+ InputStream src = getStream(128*1024);
+ InputStreamUtil.skipFully(src, DEFAULT_INTERNAL_BUFFER_SIZE - 2*1024);
+ byte[] comparisonRead = new byte[3*1024];
+ fillArray(src, comparisonRead);
+ // Compare
+ assertEquals(new ByteArrayInputStream(comparisonRead),
+ new ByteArrayInputStream(readBeforeReset));
+ // Reset the stream.
+ is.reset();
+ fillArray(is, readAfterReset);
+ assertEquals(new ByteArrayInputStream(readBeforeReset),
+ new ByteArrayInputStream(readAfterReset));
+ }
+
+ /**
+ * Tests that shifting of existing bytes works.
+ *
+ * @throws IOException if something goes wrong
+ */
+ public void testMarkResetShiftBytesMany_Internal()
+ throws IOException {
+ InputStream is = getStream(128*1024);
+ is.read();
+ is.read();
+ // The following mark fits within the existing default buffer, but the
+ // bytes after the mark have to be shifted to the left.
+ is.mark(DEFAULT_INTERNAL_BUFFER_SIZE -6);
+ byte[] readBeforeReset = new byte[DEFAULT_INTERNAL_BUFFER_SIZE -6];
+ byte[] readAfterReset = new byte[DEFAULT_INTERNAL_BUFFER_SIZE -6];
+ fillArray(is, readBeforeReset);
+ // Obtain something to compare with.
+ InputStream src = getStream(128*1024);
+ src.read();
+ src.read();
+ byte[] comparisonRead = new byte[DEFAULT_INTERNAL_BUFFER_SIZE -6];
+ fillArray(src, comparisonRead);
+ // Compare
+ assertEquals(new ByteArrayInputStream(comparisonRead),
+ new ByteArrayInputStream(readBeforeReset));
+ // Reset the stream.
+ is.reset();
+ fillArray(is, readAfterReset);
+ assertEquals(new ByteArrayInputStream(readBeforeReset),
+ new ByteArrayInputStream(readAfterReset));
+ }
+
+ /**
+ * Tests an implementation specific feature of ReaderToUTF8Stream, which is
+ * that the mark isn't invalidated even though we read past the read ahead
+ * limit, given that the internal buffer doesn't have to be refilled.
+ * <p>
+ * <em>WARNING</em>:This implementation specific feature should not be
+ * relied on by the production code! It may change at any time.
+ *
+ * @throws IOException if something goes wrong
+ */
+ public void testMarkResetExceedReadAheadLimitOK_Internal()
+ throws IOException {
+ InputStream is = getStream(4*1024+17);
+ is.mark(10);
+ assertEquals(20, is.read(new byte[20]));
+ // Note the following is implementation dependent.
+ // Since the bytes are already stored in the internal buffer, we won't
+ // fail the reset even though we have exceeded the read ahead limit.
+ // With a different stream implementation, this may fail!
+ is.reset();
+ }
+
+ /**
+ * Tests that the reset-call will fail we exceed the mark ahead limit and
+ * the internal buffer has to be refilled.
+ *
+ * @throws IOException if something goes wrong
+ */
+ public void testMarkResetExceedReadAheadLimitFail_Internal()
+ throws IOException {
+ InputStream is = getStream(64*1024+17);
+ is.mark(10);
+ // The internal buffer is 32 KB (implementation detail).
+ int toRead = 38*1024+7;
+ int read = 0;
+ byte[] buf = new byte[toRead];
+ while (read < toRead) {
+ read += is.read(buf, read, toRead - read);
+ }
+ // Note the following is implementation dependent.
+ try {
+ is.reset();
+ fail("reset-call was expected to throw IOException");
+ } catch (IOException ioe) {
+ // As expected, do nothing
+ }
+ }
+
+ /**
+ * Reads almost enough bytes to read past the read ahead limit, then tests
+ * that the reset works. After that, reads past the read ahead limit and
+ * tests that the reset fails.
+ *
+ * @throws IOException if something goes wrong
+ */
+ public void testMarkResetOverflowInternalBufferKeepBytes()
+ throws IOException {
+ InputStream is = getStream(128*1024);
+ is.mark(120*1024);
+ byte[] buf = new byte[120*1024-1];
+ fillArray(is, buf);
+ is.reset();
+ checkBeginningOfStream(is);
+
+ // Again, but this time read past the read ahead limit.
+ is = getStream(36*1024);
+ is.mark(4*1024);
+ buf = new byte[36*1024-1];
+ fillArray(is, buf);
+ try {
+ is.reset();
+ fail("reset-call was expected to throw IOException");
+ } catch (IOException ioe) {
+ // As expected, do nothing
+ }
+ }
+
+ /**
+ * Marks the stream with a read ahead limit larger than the stream itself,
+ * then reads until the end of the stream.
+ * <p>
+ * The current implementation does not allow the stream to be reset after
+ * the last byte in the stream has been read once.
+ *
+ * @throws IOException if something goes wrong
+ */
+ public void testMarkReadUntilEOF()
+ throws IOException {
+ // Try with a single buffer fill first.
+ InputStream is = getStream(4*1024);
+ is.mark(8*1024);
+ byte[] buf = new byte[8*1024];
+ int read = 0;
+ while (true) {
+ int readNow = is.read(buf, read, buf.length - read);
+ if (readNow == -1) {
+ break;
+ }
+ read += readNow;
+ }
+ try {
+ is.reset();
+ fail("reset-call was expected to throw IOException");
+ } catch (IOException ioe) {
+ // The current implementation does not allow resetting the stream
+ // when the source stream itself has been drained and all the data
+ // has been read once.
+ }
+
+ // Now try with multiple buffer fills.
+ is = getStream(640*1024);
+ is.mark(128*1024);
+ buf = new byte[8*1024];
+ while (true) {
+ // Just drain the stream.
+ if (is.read(buf, 0, buf.length) == -1) {
+ break;
+ }
+ }
+ try {
+ is.reset();
+ fail("reset-call was expected to throw IOException");
+ } catch (IOException ioe) {
+ // The current implementation does not allow resetting the stream
+ // when the source stream itself has been drained and all the data
+ // has been read once.
+ }
+ }
+
+ /**
+ * Marks the stream with a read ahead limit larger than the stream itself,
+ * then reads until just before the end of the stream.
+ *
+ * @throws IOException if something goes wrong
+ */
+ public void testMarkReadAlmostUntilEOF()
+ throws IOException {
+ // Try with a single buffer fill first.
+ int limit = 4*1024;
+ InputStream is = getStream(limit);
+ is.mark(8*1024);
+ byte[] buf = new byte[limit*2];
+ int read = 0;
+ while (read < limit -1) {
+ int readNow = is.read(buf, read, (limit -1) - read);
+ if (readNow == -1) {
+ break;
+ }
+ read += readNow;
+ }
+ // EOF has been reached when filling the internal buffer, but we still
+ // havent't read it. Therefore, the reset should succeed.
+ is.reset();
+ checkBeginningOfStream(is);
+ }
+
+ /**
+ * Makes sure that the header bytes are copied when creating a new buffer
+ * to hold all the required bytes when the stream has been marked.
+ * This will only happen the first time the buffer is filled, i.e. when the
+ * stream is marked before the first read (mark at position zero).
+ *
+ * @throws IOException if something goes wrong
+ */
+ public void testHeaderPresentInStream_Internal()
+ throws IOException {
+ final int valueLen = DEFAULT_INTERNAL_BUFFER_SIZE + 5*1024;
+ InputStream is = getStream(valueLen);
+ is.mark(valueLen - 1024);
+ // Obtain a header generator to compare with.
+ ClobStreamHeaderGenerator hdrGen = new ClobStreamHeaderGenerator(false);
+ byte[] hdrTmp = new byte[100];
+ int headerLen = hdrGen.generateInto(hdrTmp, 0, valueLen);
+ byte[] hdr1 = new byte[headerLen];
+ System.arraycopy(hdrTmp, 0, hdr1, 0, headerLen);
+ byte[] hdr2 = new byte[headerLen];
+ // Get the first bytes from the stream being tested.
+ assertEquals(headerLen, is.read(hdr2));
+ assertEquals(new ByteArrayInputStream(hdr1),
+ new ByteArrayInputStream(hdr2));
+ }
+
+ /**
+ * Returns a stream to test, loaded with the repeating modern latin
+ * lowercase alphabet.
+ *
+ * @param length the length of the stream in characters
+ * @return A stream serving bytes.
+ */
+ private InputStream getStream(int length) {
+ Reader src = new LoopingAlphabetReader(length,
+ CharAlphabet.modernLatinLowercase());
+ InputStream is = new ReaderToUTF8Stream(
+ src, length, 0, "CLOB", new ClobStreamHeaderGenerator(false));
+ assertTrue("The stream doesn't support mark/reset", is.markSupported());
+ return is;
+ }
+
+ /**
+ * Checks the beginning of the stream, which is expected to consist of five
+ * header bytes (skipped) followed by the bytes for the characters 'a' and
+ * 'b'.
+ *
+ * @param is the stream to check
+ * @throws IOException if reading from the stream fails
+ * @throws AssertionFailedError if the stream content isn't as expected
+ */
+ private void checkBeginningOfStream(InputStream is)
+ throws IOException {
+ assertEquals(5, is.skip(5));
+ // We should now get the character a, followed by b.
+ assertEquals((byte)'a', is.read());
+ assertEquals((byte)'b', is.read());
+ }
+
+ /**
+ * Fills the array by reading from the stream.
+ *
+ * @param is input stream to read from
+ * @param b array to fill with bytes from the stream
+ * @throws IOException if reading from the array fails, or the end of the
+ * stream is reached
+ */
+ private void fillArray(InputStream is, byte[] b)
+ throws IOException {
+ final int toRead = b.length;
+ int read = 0;
+ while (read < toRead) {
+ int readNow = is.read(b, read, toRead - read);
+ assertTrue("reached EOF", readNow != -1);
+ read += readNow;
+ }
+ }
+
+ /**
+ * Performs a series of random operations on a {@code ReaderToUTF8Stream},
+ * consisting of read, skip, mark, reset and a noop.
+ * <p>
+ * <em>Note</em>: Turn on debugging (derby.tests.debug=true) to see some
+ * information, turn on tracing (derby.tests.trace=true) in addition to see
+ * a lot more information.
+ * <p>
+ * If the test fails, the seed will be reported in the error message, and
+ * the load that failed can be rerun.
+ *
+ * @throws IOException if the test fails
+ */
+ public void testRandomSequence()
+ throws IOException {
+ final long seed = System.currentTimeMillis();
+ try {
+ testRandomSequence(seed);
+ } catch (IOException ioe) {
+ // Report the seed for repeatability.
+ IOException wrapper = new IOException("seed=" + seed);
+ wrapper.initCause(ioe);
+ throw wrapper;
+ }
+ }
+
+ /**
+ * Performs a series of random operations on a {@code ReaderToUTF8Stream},
+ * consisting of read, skip, mark, reset and a noop.
+ * <p>
+ * Note that this test verifies that executing the operations don't fail,
+ * but it doesn't verify that the bytes obtained from the stream are the
+ * correct ones.
+ *
+ * @param seed seed controlling the test load
+ * @throws IOException if the test fails
+ */
+ private void testRandomSequence(long seed)
+ throws IOException {
+ println("testRandomSequence seed: " + seed);
+ final int iterations = 100;
+ final Random rng = new Random(seed);
+ for (int i=0; i < iterations; i++) {
+ // Operation counters.
+ int reads = 0, skips = 0, resets = 0, marks = 0, invalidations = 0;
+ // Stream length (up to ~1 MB).
+ int length = 1024*rng.nextInt(1024) + rng.nextInt(1024);
+ boolean rs = rng.nextBoolean();
+ println(">>> iteration " + i + ", length=" + length);
+ int currentPos = 0;
+ int limit = 0;
+ int mark = -1;
+ InputStream is = getStream(length);
+ int ops = 0;
+ while (ops < 200 && currentPos < length - 10) {
+ if (rng.nextBoolean()) { // Whether to read/skip or mark/reset.
+ int toRead = getRandomLength(currentPos, length, rng, rs);
+ if (rng.nextBoolean()) {
+ // Read
+ mytrace("\treading " + toRead + " bytes");
+ reads++;
+ is.read(new byte[toRead]);
+ } else {
+ // Skip
+ mytrace("\tskipping " + toRead + " bytes");
+ skips++;
+ is.skip(toRead);
+ }
+ currentPos += toRead;
+ if (mark != -1 && (currentPos - mark) > limit) {
+ mytrace("\t\tmark invalidated");
+ invalidations++;
+ mark = -1;
+ limit = 0;
+ }
+ }
+ if (rng.nextBoolean()) { // Whether to read/skip or mark/reset.
+ // Mark/reset, or do nothing.
+ if (rng.nextBoolean()) {
+ if (rng.nextInt(100) < 40 && mark != -1) {
+ // Reset
+ mytrace("\tresetting to position " + mark);
+ resets++;
+ is.reset();
+ currentPos = mark;
+ mark = -1;
+ } else {
+ // Mark
+ limit = getRandomLength(currentPos, length, rng);
+ mytrace("\tmarking position " + currentPos +
+ " with limit " + limit);
+ marks++;
+ mark = currentPos;
+ is.mark(limit);
+ }
+ }
+ }
+ ops++;
+ }
+ println("ops=" + ops + ", reads=" + reads + ", skips=" + skips +
+ ", marks=" + marks + ", resets=" + resets +
+ ", invalidations=" + invalidations);
+ }
+ }
+
+
+ /**
+ * Returns a random length within the limits.
+ * <p>
+ * This call will operate in the full range of the remaining bytes.
+ *
+ * @param currentPos the current position of the stream
+ * @param length the length of the stream
+ * @param rng random generator
+ * @return A random length within the limits of the stream.
+ */
+ private int getRandomLength(int currentPos, int length, Random rng) {
+ return getRandomLength(currentPos, length, rng, false);
+ }
+
+ /**
+ * Returns a random length within the limits.
+ *
+ * @param currentPos the current position of the stream
+ * @param length the length of the stream
+ * @param rng random generator
+ * @param reducedSize whether to return smaller number or not
+ * (setting to true may increase the number of operations that will be
+ * performed on a stream before it is exhausted)
+ * @return A random length within the limits of the stream.
+ */
+ private int getRandomLength(int currentPos, int length, Random rng, boolean reducedSize) {
+ int max = length - currentPos;
+ if (reducedSize) {
+ max = max / 5;
+ }
+ return (1 + (int)(max * rng.nextFloat()));
+ }
+
+ /**
+ * Trace only if both trace and verbose is true in the test configuration.
+ *
+ * @param str the string to print
+ */
+ private void mytrace(String str) {
+ if (getTestConfiguration().isVerbose()) {
+ traceit(str);
+ }
+ }
+
+ /**
+ * Tests mark/reset functionality by comparing with
+ * {@code ByteArrayInputStream}.
+ *
+ * @throws IOException if the test fails
+ */
+ public void testMarkReset1()
+ throws IOException {
+ InputStream is = getStream(64*1024);
+ byte[] srcBuf = new byte[64*1024+5];
+ fillArray(is, srcBuf);
+ InputStream src = new ByteArrayInputStream(srcBuf);
+ // Reinitialize the stream.
+ is = getStream(64*1024);
+
+ StreamUtil su = new StreamUtil(is, src);
+ su.mark(1024);
+ su.skip(17);
+ su.reset();
+ su.read(1);
+ su.read(2133);
+ su.mark(1024);
+ su.reset();
+ su.mark(1024);
+ su.skip(18);
+ su.read(1024);
+ }
+
+ /**
+ * Tests mark/reset functionality by comparing with
+ * {@code ByteArrayInputStream}. This test relies on knowing the size of
+ * the internal buffer to force a shifting of existing bytes to take place.
+ *
+ * @throws IOException if the test fails
+ */
+ public void testMarkReset2_Internal()
+ throws IOException {
+ InputStream is = getStream(128*1024);
+ byte[] srcBuf = new byte[128*1024+5];
+ fillArray(is, srcBuf);
+ InputStream src = new ByteArrayInputStream(srcBuf);
+ // Reinitialize the stream.
+ is = getStream(128*1024);
+
+ StreamUtil su = new StreamUtil(is, src);
+ su.skip(DEFAULT_INTERNAL_BUFFER_SIZE);
+ su.mark(DEFAULT_INTERNAL_BUFFER_SIZE + 2*1024);
+ su.read(1024);
+ su.reset();
+ su.read(3*1024);
+ }
+
+ /**
+ * Utility class executing a few selected method calls on two streams,
+ * expecting both of them to behave in the same way.
+ */
+ private class StreamUtil {
+ private final InputStream is1;
+ private final InputStream is2;
+
+ StreamUtil(InputStream is1, InputStream is2) {
+ assertNotNull(is1);
+ assertNotNull(is2);
+ this.is1 = is1;
+ this.is2 = is2;
+ }
+
+ public void mark(int readAheadLimit) {
+ is1.mark(readAheadLimit);
+ is2.mark(readAheadLimit);
+ }
+
+ public void reset()
+ throws IOException {
+ is1.reset();
+ is2.reset();
+ }
+
+ public long skip(long skip)
+ throws IOException {
+ long skip1 = 0;
+ long skip2 = 0;
+ // Skip data in the first stream.
+ while (skip1 < skip) {
+ long skippedNow = is1.skip(skip - skip1);
+ if (skippedNow == -1) {
+ fail("stream one reached EOF: " + is1.getClass());
+ }
+ skip1 += skippedNow;
+ }
+ // Skip data in the second stream.
+ while (skip2 < skip) {
+ long skippedNow = is2.skip(skip - skip2);
+ if (skippedNow == -1) {
+ fail("stream two reached EOF: " + is2.getClass());
+ }
+ skip2 += skippedNow;
+ }
+ assertEquals(skip1, skip2);
+ return skip1;
+ }
+
+ public int read(int toRead)
+ throws IOException {
+ byte[] b1 = new byte[toRead];
+ byte[] b2 = new byte[toRead];
+ int read = read(b1, b2, false);
+ assertEquals(new ByteArrayInputStream(b1),
+ new ByteArrayInputStream(b2));
+ return read;
+ }
+
+ public int read(byte[] b1, byte[] b2, boolean expectEOF)
+ throws IOException {
+ assertEquals("unequal sized arrays", b1.length, b2.length);
+ int read1 = 0;
+ int read2 = 0;
+ final int toRead = b1.length;
+ // Read from the first stream.
+ while (read1 < toRead) {
+ int readNow = is1.read(b1, read1, toRead - read1);
+ if (readNow == -1) {
+ if (expectEOF) {
+ break;
+ } else {
+ fail("stream one reached EOF: " + is1.getClass());
+ }
+ }
+ read1 += readNow;
+ }
+ // Read from the second stream.
+ while (read2 < toRead) {
+ int readNow = is2.read(b2, read2, toRead - read2);
+ if (readNow == -1) {
+ if (expectEOF) {
+ break;
+ } else {
+ fail("stream two reached EOF: " + is2.getClass());
+ }
+ }
+ read2 += readNow;
+ }
+ assertEquals(read1, read2);
+ return read1;
+ }
+ }
+}
Propchange: db/derby/code/trunk/java/testing/org/apache/derbyTesting/unitTests/junit/ReaderToUTF8StreamTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: db/derby/code/trunk/java/testing/org/apache/derbyTesting/unitTests/junit/_Suite.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/testing/org/apache/derbyTesting/unitTests/junit/_Suite.java?rev=762384&r1=762383&r2=762384&view=diff
==============================================================================
--- db/derby/code/trunk/java/testing/org/apache/derbyTesting/unitTests/junit/_Suite.java (original)
+++ db/derby/code/trunk/java/testing/org/apache/derbyTesting/unitTests/junit/_Suite.java Mon Apr 6 15:13:44 2009
@@ -57,6 +57,7 @@
suite.addTest(BlockedByteArrayTest.suite());
suite.addTest(PathUtilTest.suite());
suite.addTest(VirtualFileTest.suite());
+ suite.addTest(ReaderToUTF8StreamTest.suite());
return suite;
}