You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2009/06/05 20:31:45 UTC
svn commit: r782092 [1/2] - in /hadoop/avro/trunk: ./
src/java/org/apache/avro/file/ src/java/org/apache/avro/generic/
src/java/org/apache/avro/io/ src/java/org/apache/avro/ipc/
src/test/java/org/apache/avro/ src/test/java/org/apache/avro/io/
Author: cutting
Date: Fri Jun 5 18:31:45 2009
New Revision: 782092
URL: http://svn.apache.org/viewvc?rev=782092&view=rev
Log:
AVRO-25. Add blocking value writer. Contributed by Thiruvalluvan M. G.
Added:
hadoop/avro/trunk/src/java/org/apache/avro/io/BlockingValueWriter.java
hadoop/avro/trunk/src/test/java/org/apache/avro/io/TestBlockingIO.java
Modified:
hadoop/avro/trunk/CHANGES.txt
hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileReader.java
hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileWriter.java
hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumReader.java
hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumWriter.java
hadoop/avro/trunk/src/java/org/apache/avro/io/ValueReader.java
hadoop/avro/trunk/src/java/org/apache/avro/io/ValueWriter.java
hadoop/avro/trunk/src/java/org/apache/avro/ipc/ByteBufferInputStream.java
hadoop/avro/trunk/src/java/org/apache/avro/ipc/ByteBufferOutputStream.java
hadoop/avro/trunk/src/java/org/apache/avro/ipc/ByteBufferValueReader.java
hadoop/avro/trunk/src/java/org/apache/avro/ipc/ByteBufferValueWriter.java
hadoop/avro/trunk/src/java/org/apache/avro/ipc/Requestor.java
hadoop/avro/trunk/src/java/org/apache/avro/ipc/Responder.java
hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFile.java
hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolGeneric.java
hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolSpecific.java
hadoop/avro/trunk/src/test/java/org/apache/avro/io/TestValueReader.java
Modified: hadoop/avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=782092&r1=782091&r2=782092&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Fri Jun 5 18:31:45 2009
@@ -29,6 +29,10 @@
AVRO-42. Add partial C++ implementation. (Scott Banachowski via cutting)
+ AVRO-25. Add blocking value writer that permits arbitrarily long
+ arrays and maps to be efficiently written as sequences of blocks.
+ (Thiruvalluvan M. G. via cutting)
+
IMPROVEMENTS
AVRO-11. Re-implement specific and reflect datum readers and
Modified: hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileReader.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileReader.java?rev=782092&r1=782091&r2=782092&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileReader.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileReader.java Fri Jun 5 18:31:45 2009
@@ -19,6 +19,7 @@
import java.io.*;
import java.util.*;
+import java.nio.ByteBuffer;
import java.rmi.server.UID;
import java.security.MessageDigest;
@@ -47,10 +48,9 @@
public DataFileReader(SeekableInput sin, DatumReader<D> reader)
throws IOException {
this.in = new SeekableBufferedInput(sin);
- this.vin = new ValueReader(in);
byte[] magic = new byte[4];
- vin.readBytes(magic);
+ in.read(magic);
if (!Arrays.equals(DataFileWriter.MAGIC, magic))
throw new IOException("Not a data file.");
@@ -58,12 +58,18 @@
in.seek(length-4);
int footerSize=(in.read()<<24)+(in.read()<<16)+(in.read()<<8)+in.read();
in.seek(length-footerSize);
- int metaLength = (int)vin.readLong();
- for (int i = 0; i < metaLength; i++) { // read meta
- String key = vin.readUtf8(null).toString();
- byte[] value = new byte[(int)vin.readLong()];
- vin.readBytes(value);
- meta.put(key, value);
+ this.vin = new ValueReader(in);
+ long l = vin.readMapStart();
+ if (l > 0) {
+ do {
+ for (long i = 0; i < l; i++) {
+ String key = vin.readString(null).toString();
+ ByteBuffer value = vin.readBytes(null);
+ byte[] bb = new byte[value.remaining()];
+ value.get(bb);
+ meta.put(key, bb);
+ }
+ } while ((l = vin.mapNext()) != 0);
}
this.sync = getMeta("sync");
@@ -114,7 +120,7 @@
}
private void skipSync() throws IOException {
- vin.readBytes(syncBuffer);
+ vin.readFixed(syncBuffer);
if (!Arrays.equals(syncBuffer, sync))
throw new IOException("Invalid sync!");
}
@@ -133,7 +139,7 @@
return;
}
in.seek(position);
- vin.readBytes(syncBuffer);
+ vin.readFixed(syncBuffer);
for (int i = 0; in.tell() < in.length(); i++) {
int j = 0;
for (; j < sync.length; j++) {
Modified: hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileWriter.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileWriter.java?rev=782092&r1=782091&r2=782092&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileWriter.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileWriter.java Fri Jun 5 18:31:45 2009
@@ -145,12 +145,15 @@
private void writeFooter() throws IOException {
writeBlock(); // flush any data
setMeta("count", count); // update count
- bufOut.writeLong(meta.size()); // write meta entries
+ bufOut.writeMapStart(); // write meta entries
+ bufOut.setItemCount(meta.size());
for (Map.Entry<String,byte[]> entry : meta.entrySet()) {
- bufOut.writeUtf8(new Utf8(entry.getKey()));
- bufOut.writeLong(entry.getValue().length);
- bufOut.write(entry.getValue());
+ bufOut.startItem();
+ bufOut.writeString(entry.getKey());
+ bufOut.writeBytes(entry.getValue());
}
+ bufOut.writeMapEnd();
+
int size = buffer.size()+4;
out.write(sync);
vout.writeLong(FOOTER_BLOCK); // tag the block
Modified: hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumReader.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumReader.java?rev=782092&r1=782091&r2=782092&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumReader.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumReader.java Fri Jun 5 18:31:45 2009
@@ -62,7 +62,7 @@
protected Object read(Object old, Schema actual,
Schema expected, ValueReader in) throws IOException {
if (actual.getType() == Type.UNION) // resolve unions
- actual = actual.getTypes().get((int)in.readLong());
+ actual = actual.getTypes().get((int)in.readIndex());
if (expected.getType() == Type.UNION)
expected = resolveExpected(actual, expected);
switch (actual.getType()) {
@@ -223,7 +223,7 @@
for (Iterator<String> i = json.getFieldNames(); i.hasNext();) {
String key = i.next();
addToMap(map, new Utf8(key),
- defaultFieldValue(null, value, json.getFieldValue(key)));
+ defaultFieldValue(null, value, json.get(key)));
}
return map;
case UNION: return defaultFieldValue(old, schema.getTypes().get(0), json);
@@ -247,7 +247,7 @@
String name = expected.getName();
if (name != null && !name.equals(actual.getName()))
throw new AvroTypeException("Expected "+expected+", found "+actual);
- return createEnum(actual.getEnumSymbols().get(in.readInt()), expected);
+ return createEnum(actual.getEnumSymbols().get(in.readEnum()), expected);
}
/** Called to create an enum value. May be overridden for alternate enum
@@ -256,17 +256,23 @@
/** Called to read an array instance. May be overridden for alternate array
* representations.*/
- @SuppressWarnings(value="unchecked")
protected Object readArray(Object old, Schema actual, Schema expected,
ValueReader in) throws IOException {
Schema actualType = actual.getElementType();
Schema expectedType = expected.getElementType();
- long firstBlockSize = in.readLong();
- Object array = newArray(old, (int) firstBlockSize);
- for (long l = firstBlockSize; l > 0; l = in.readLong())
- for (long i = 0; i < l; i++)
- addToArray(array, read(peekArray(array), actualType, expectedType, in));
- return array;
+ long l = in.readArrayStart();
+ if (l > 0) {
+ Object array = newArray(old, (int) l);
+ do {
+ for (long i = 0; i < l; i++) {
+ addToArray(array, read(peekArray(array), actualType, expectedType, in));
+ }
+ } while ((l = in.arrayNext()) > 0);
+
+ return array;
+ } else {
+ return newArray(old, 0);
+ }
}
/** Called by the default implementation of {@link #readArray} to retrieve a
@@ -286,18 +292,21 @@
/** Called to read a map instance. May be overridden for alternate map
* representations.*/
- @SuppressWarnings(value="unchecked")
protected Object readMap(Object old, Schema actual, Schema expected,
ValueReader in) throws IOException {
Schema aValue = actual.getValueType();
Schema eValue = expected.getValueType();
- int firstBlockSize = (int)in.readLong();
- Object map = newMap(old, firstBlockSize);
- for (long l = firstBlockSize; l > 0; l = in.readLong())
- for (long i = 0; i < l; i++)
- addToMap(map,
- readString(null, in),
- read(null, aValue, eValue, in));
+ long l = in.readMapStart();
+ Object map = newMap(old, (int) l);
+ if (l > 0) {
+ do {
+ for (int i = 0; i < l; i++) {
+ addToMap(map,
+ readString(null, in),
+ read(null, aValue, eValue, in));
+ }
+ } while ((l = in.mapNext()) > 0);
+ }
return map;
}
@@ -316,7 +325,7 @@
if (!actual.equals(expected))
throw new AvroTypeException("Expected "+expected+", found "+actual);
GenericFixed fixed = (GenericFixed)createFixed(old, expected);
- in.readBytes(fixed.bytes(), 0, actual.getFixedSize());
+ in.readFixed(fixed.bytes(), 0, actual.getFixedSize());
return fixed;
}
@@ -356,6 +365,7 @@
/** Called to create new array instances. Subclasses may override to use a
* different array implementation. By default, this returns a {@link
* GenericData.Array}.*/
+ @SuppressWarnings("unchecked")
protected Object newArray(Object old, int size) {
if (old instanceof GenericArray) {
((GenericArray) old).clear();
@@ -366,6 +376,7 @@
/** Called to create new array instances. Subclasses may override to use a
* different map implementation. By default, this returns a {@link
* HashMap}.*/
+ @SuppressWarnings("unchecked")
protected Object newMap(Object old, int size) {
if (old instanceof Map) {
((Map) old).clear();
@@ -375,9 +386,9 @@
/** Called to read strings. Subclasses may override to use a different
* string representation. By default, this calls {@link
- * ValueReader#readUtf8(Object)}.*/
+ * ValueReader#readString(Utf8)}.*/
protected Object readString(Object old, ValueReader in) throws IOException {
- return in.readUtf8(old);
+ return in.readString((Utf8)old);
}
/** Called to create a string from a default value. Subclasses may override
@@ -387,9 +398,9 @@
/** Called to read byte arrays. Subclasses may override to use a different
* byte array representation. By default, this calls {@link
- * ValueReader#readBuffer(Object)}.*/
+ * ValueReader#readBytes(ByteBuffer)}.*/
protected Object readBytes(Object old, ValueReader in) throws IOException {
- return in.readBuffer(old);
+ return in.readBytes((ByteBuffer)old);
}
/** Called to create byte arrays from default values. Subclasses may
@@ -411,29 +422,30 @@
break;
case ARRAY:
Schema elementType = schema.getElementType();
- for (int l = (int)in.readLong(); l > 0; l = (int)in.readLong())
- for (int i = 0; i < l; i++)
+ for (long l = in.skipArray(); l > 0; l = in.skipArray()) {
+ for (long i = 0; i < l; i++) {
skip(elementType, in);
+ }
+ }
break;
case MAP:
Schema value = schema.getValueType();
- for (int l = (int)in.readLong(); l > 0; l = (int)in.readLong())
- for (int i = 0; i < l; i++) {
+ for (long l = in.skipMap(); l > 0; l = in.skipMap()) {
+ for (long i = 0; i < l; i++) {
skip(STRING_SCHEMA, in);
skip(value, in);
}
+ }
break;
case UNION:
- skip(schema.getTypes().get((int)in.readLong()), in);
+ skip(schema.getTypes().get((int)in.readIndex()), in);
break;
case FIXED:
- in.skip(schema.getFixedSize());
+ in.skipFixed(schema.getFixedSize());
break;
case STRING:
case BYTES:
- long length = in.readLong();
- while (length > 0)
- length -= in.skip(length);
+ in.skipBytes();
break;
case INT: in.readInt(); break;
case LONG: in.readLong(); break;
Modified: hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumWriter.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumWriter.java?rev=782092&r1=782091&r2=782092&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumWriter.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumWriter.java Fri Jun 5 18:31:45 2009
@@ -21,7 +21,6 @@
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.Map;
-import java.util.Set;
import java.util.Map.Entry;
import org.apache.avro.AvroRuntimeException;
@@ -58,7 +57,7 @@
case MAP: writeMap(schema, datum, out); break;
case UNION:
int index = resolveUnion(schema, datum);
- out.writeLong(index);
+ out.writeIndex(index);
write(schema.getTypes().get(index), datum, out);
break;
case FIXED: writeFixed(schema, datum, out); break;
@@ -95,7 +94,7 @@
* representations.*/
protected void writeEnum(Schema schema, Object datum, ValueWriter out)
throws IOException {
- out.writeInt(schema.getEnumOrdinal((String)datum));
+ out.writeEnum(schema.getEnumOrdinal((String)datum));
}
/** Called to write a array. May be overridden for alternate array
@@ -104,12 +103,13 @@
throws IOException {
Schema element = schema.getElementType();
long size = getArraySize(datum);
- if (size > 0) {
- out.writeLong(size);
- for (Iterator<? extends Object> it = getArrayElements(datum); it.hasNext();)
- write(element, it.next(), out);
+ out.writeArrayStart();
+ out.setItemCount(size);
+ for (Iterator<? extends Object> it = getArrayElements(datum); it.hasNext();) {
+ out.startItem();
+ write(element, it.next(), out);
}
- out.writeLong(0);
+ out.writeArrayEnd();
}
/** Called by the default implementation of {@link #writeArray} to get the
@@ -133,14 +133,14 @@
throws IOException {
Schema value = schema.getValueType();
int size = getMapSize(datum);
- if (size > 0) {
- out.writeLong(size); // write a single block
- for (Map.Entry<Object,Object> entry : getMapEntries(datum)) {
- writeString(entry.getKey(), out);
- write(value, entry.getValue(), out);
- }
+ out.writeMapStart();
+ out.setItemCount(size);
+ for (Map.Entry<Object,Object> entry : getMapEntries(datum)) {
+ out.startItem();
+ out.writeString((Utf8) entry.getKey());
+ write(value, entry.getValue(), out);
}
- out.writeLong(0);
+ out.writeMapEnd();
}
/** Called by the default implementation of {@link #writeMap} to get the size
@@ -160,13 +160,13 @@
/** Called to write a string. May be overridden for alternate string
* representations.*/
protected void writeString(Object datum, ValueWriter out) throws IOException {
- out.writeUtf8((Utf8)datum);
+ out.writeString((Utf8)datum);
}
/** Called to write a bytes. May be overridden for alternate bytes
* representations.*/
protected void writeBytes(Object datum, ValueWriter out) throws IOException {
- out.writeBuffer((ByteBuffer)datum);
+ out.writeBytes((ByteBuffer)datum);
}
private int resolveUnion(Schema union, Object datum) {
@@ -207,7 +207,7 @@
* representations.*/
protected void writeFixed(Schema schema, Object datum, ValueWriter out)
throws IOException {
- out.write(((GenericFixed)datum).bytes(), 0, schema.getFixedSize());
+ out.writeFixed(((GenericFixed)datum).bytes(), 0, schema.getFixedSize());
}
/** Called by the default implementation of {@link #instanceOf}.*/
Added: hadoop/avro/trunk/src/java/org/apache/avro/io/BlockingValueWriter.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/io/BlockingValueWriter.java?rev=782092&view=auto
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/io/BlockingValueWriter.java (added)
+++ hadoop/avro/trunk/src/java/org/apache/avro/io/BlockingValueWriter.java Fri Jun 5 18:31:45 2009
@@ -0,0 +1,622 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.io;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.apache.avro.AvroTypeException;
+import org.apache.avro.Schema;
+import org.apache.avro.util.Utf8;
+
+/** A {@link ValueWriter} that writes large arrays and maps as a sequence of
+ * blocks. So long as individual primitive values fit in memory, arbitrarily
+ * long arrays and maps may be written and subsequently read without exhausting
+ * memory. Values are buffered until the specified block size would be
+ * exceeded, minimizing block overhead.
+ * @see ValueWriter
+ */
+public class BlockingValueWriter extends ValueWriter {
+
+ /* Implementation note:
+ *
+ * Blocking is complicated because of nesting. If a large, nested
+ * value overflows your buffer, you've got to do a lot of dancing
+ * around to output the blocks correctly.
+ *
+ * To handle this complexity, this class keeps a stack of blocked
+ * values: each time a new block is started (e.g., by a call to
+ * {@link #writeArrayStart}), an entry is pushed onto this stack.
+ *
+ * In this stack, we keep track of the state of a block. Blocks can
+ * be in two states. "Regular" blocks have a non-zero byte count.
+ * "Overflow" blocks help us deal with the case where a block
+ * contains a value that's too big to buffer. In this case, the
+ * block contains only one item, and we give it an unknown
+ * byte-count. Because these values (1,unknown) are fixed, we're
+ * able to write the header for these overflow blocks to the
+ * underlying stream without seeing the entire block. After writing
+ * this header, we've freed our buffer space to be fully devoted to
+ * blocking the large, inner value.
+ */
+
+ private static class BlockedValue {
+ public enum State {
+ /**
+ * The bottom element of our stack represents being _outside_
+ * of a blocked value.
+ */
+ ROOT,
+
+ /**
+ * Represents the "regular" case, i.e., a blocked-value whose
+ * current block is fully contained in the buffer. In this
+ * case, {@link BlockedValue#start} points to the start of the
+ * blocks _data_ -- but no room has been left for a header!
+ * When this block is terminated, it's data will have to be
+ * moved over a bit to make room for the header. */
+ REGULAR,
+
+ /**
+ * Represents a blocked-value whose current block is in the
+ * overflow state. In this case, {@link BlockedValue#start} is zero. The
+ * header for such a block has _already been written_ (we've
+ * written out a header indicating that the block has a single
+ * item, and we put a "zero" down for the byte-count to indicate
+ * that we don't know the physical length of the buffer. Any blocks
+ * _containing_ this block must be in the {@link #OVERFLOW}
+ * state. */
+ OVERFLOW
+ };
+
+ /** The type of this blocked value (ARRAY or MAP). */
+ public Schema.Type type;
+
+ /** The state of this BlockedValue */
+ public State state;
+
+ /** The location in the buffer where this blocked value starts */
+ public int start;
+
+ /**
+ * The index one past the last byte for the previous item. If this
+ * is the first item, this is same as {@link #start}.
+ */
+ public int lastFullItem;
+
+ /**
+ * Number of items in this blocked value that are stored
+ * in the buffer.
+ */
+ public int items;
+
+ /** Number of items left to write*/
+ public long itemsLeftToWrite;
+
+ /** Create a ROOT instance. */
+ public BlockedValue() {
+ this.type = null;
+ this.state = BlockedValue.State.ROOT;
+ this.start = this.lastFullItem = 0;
+ this.items = 1; // Makes various assertions work out
+ }
+
+ /** Create a REGULAR instance. (Gets changed to OVERFLOW by
+ * {@link #compact}.) */
+ public BlockedValue(Schema.Type type, int start) {
+ this.type = type;
+ this.state = State.REGULAR;
+ this.start = this.lastFullItem = start;
+ this.items = 0;
+ }
+
+ /**
+ * Check invariants of <code>this</code> and also the
+ * <code>BlockedValue</code> containing <code>this</code>.
+ */
+ public boolean check(BlockedValue prev, int pos) {
+ assert state != State.ROOT || type == null;
+ assert (state == State.ROOT ||
+ type == Schema.Type.ARRAY || type == Schema.Type.MAP);
+
+ assert 0 <= items;
+ assert 0 != items || start == pos; // 0==itms ==> start==pos
+ assert 1 < items || start == lastFullItem; // 1<=itms ==> start==lFI
+ assert items <= 1 || start <= lastFullItem; // 1<itms ==> start<=lFI
+ assert lastFullItem <= pos;
+
+ switch (state) {
+ case ROOT:
+ assert start == 0;
+ assert prev == null;
+ break;
+ case REGULAR:
+ assert start >= 0;
+ assert prev.lastFullItem <= start;
+ assert 1 <= prev.items;
+ break;
+ case OVERFLOW:
+ assert start == 0;
+ assert items == 1;
+ assert prev.state == State.ROOT || prev.state == State.OVERFLOW;
+ break;
+ }
+ return false;
+ }
+ }
+
+ /**
+ * The buffer to hold the bytes before being written into the underlying
+ * stream.
+ */
+ private byte[] buf;
+
+ /**
+ * Index into the location in {@link #buf}, where next byte can be written.
+ */
+ private int pos;
+
+ /**
+ * The state stack.
+ */
+ private BlockedValue[] blockStack;
+ private int stackTop = -1;
+ private static int STACK_STEP = 10;
+
+ private static final class EncoderBuffer extends ByteArrayOutputStream {
+ public byte[] buffer() {
+ return buf;
+ }
+
+ public int length() {
+ return count;
+ }
+ }
+
+ private EncoderBuffer encoderBuffer = new EncoderBuffer();
+
+ private boolean check() {
+ assert out != null;
+ assert buf != null;
+ assert MIN_BUFFER_SIZE <= buf.length;
+ assert 0 <= pos;
+ assert pos <= buf.length : pos + " " + buf.length;
+
+ assert blockStack != null;
+ BlockedValue prev = null;
+ for (int i = 0; i <= stackTop; i++) {
+ BlockedValue v = blockStack[i];
+ v.check(prev, pos);
+ prev = v;
+ }
+ return true;
+ }
+
+ private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
+ private static final int MIN_BUFFER_SIZE = 64;
+
+ public BlockingValueWriter(OutputStream out) {
+ this(out, DEFAULT_BUFFER_SIZE);
+ }
+
+ public BlockingValueWriter(OutputStream out, int bufferSize) {
+ super(out);
+ if (bufferSize < MIN_BUFFER_SIZE) {
+ throw new IllegalArgumentException("Buffer size too smll.");
+ }
+ this.buf = new byte[bufferSize];
+ this.pos = 0;
+ blockStack = new BlockedValue[0];
+ expandStack();
+ BlockedValue bv = blockStack[++stackTop];
+ bv.type = null;
+ bv.state = BlockedValue.State.ROOT;
+ bv.start = bv.lastFullItem = 0;
+ bv.items = 1;
+
+ assert check();
+ }
+
+ private void expandStack() {
+ int oldLength = blockStack.length;
+ blockStack = Arrays.copyOf(blockStack,
+ blockStack.length + STACK_STEP);
+ for (int i = oldLength; i < blockStack.length; i++) {
+ blockStack[i] = new BlockedValue();
+ }
+ }
+
+ /** Redirect output (and reset the parser state if we're checking). */
+ @Override
+ public void init(OutputStream out) throws IOException {
+ super.init(out);
+ this.pos = 0;
+ this.stackTop = 0;
+
+ assert check();
+ }
+
+ @Override
+ public void flush() throws IOException {
+ BlockedValue bv = blockStack[stackTop];
+ if (bv.state == BlockedValue.State.ROOT) {
+ out.write(buf, 0, pos);
+ pos = 0;
+ } else {
+ while (bv.state != BlockedValue.State.OVERFLOW) {
+ compact();
+ }
+ }
+ out.flush();
+
+ assert check();
+ }
+
+ @Override
+ public void writeBoolean(boolean b) throws IOException {
+ if (buf.length < (pos + 1)) ensure(1);
+ buf[pos++] = (byte)(b ? 1 : 0);
+
+ assert check();
+ }
+
+ @Override
+ public void writeInt(int n) throws IOException {
+ if (pos + 5 > buf.length) {
+ ensure(5);
+ }
+ pos = encodeLong(n, buf, pos);
+
+ assert check();
+ }
+
+ @Override
+ public void writeLong(long n) throws IOException {
+ if (pos + 10 > buf.length) {
+ ensure(10);
+ }
+ pos = encodeLong(n, buf, pos);
+
+ assert check();
+ }
+
+ @Override
+ public void writeFloat(float f) throws IOException {
+ if (pos + 4 > buf.length) {
+ ensure(4);
+ }
+ pos = encodeFloat(f, buf, pos);
+
+ assert check();
+ }
+
+ @Override
+ public void writeDouble(double d) throws IOException {
+ if (pos + 8 > buf.length) {
+ ensure(8);
+ }
+ pos = encodeDouble(d, buf, pos);
+
+ assert check();
+ }
+
+ @Override
+ public void writeString(Utf8 utf8) throws IOException {
+ writeBytes(utf8.getBytes(), 0, utf8.getLength());
+
+ assert check();
+ }
+
+ @Override
+ public void writeBytes(ByteBuffer bytes) throws IOException {
+ writeBytes(bytes.array(), bytes.position(), bytes.remaining());
+
+ assert check();
+ }
+
+ @Override
+ public void writeFixed(byte[] bytes, int start, int len) throws IOException {
+ doWriteBytes(bytes, start, len);
+
+ assert check();
+ }
+
+ @Override
+ public void writeBytes(byte[] bytes, int start, int len) throws IOException {
+ if (pos + 5 > buf.length) {
+ ensure(5);
+ }
+ pos = encodeLong(len, buf, pos);
+ doWriteBytes(bytes, start, len);
+
+ assert check();
+ }
+
+ @Override
+ public void writeArrayStart() throws IOException {
+ if (stackTop + 1 == blockStack.length) {
+ expandStack();
+ }
+
+ BlockedValue bv = blockStack[++stackTop];
+ bv.type = Schema.Type.ARRAY;
+ bv.state = BlockedValue.State.REGULAR;
+ bv.start = bv.lastFullItem = pos;
+ bv.items = 0;
+
+ assert check();
+ }
+
+ @Override
+ public void setItemCount(long itemCount) throws IOException {
+ BlockedValue v = blockStack[stackTop];
+ assert v.type == Schema.Type.ARRAY || v.type == Schema.Type.MAP;
+ assert v.itemsLeftToWrite == 0;
+ v.itemsLeftToWrite = itemCount;
+
+ assert check();
+ }
+
+ @Override
+ public void startItem() throws IOException {
+ if (blockStack[stackTop].state == BlockedValue.State.OVERFLOW) {
+ finishOverflow();
+ }
+ BlockedValue t = blockStack[stackTop];
+ t.items++;
+ t.lastFullItem = pos;
+ t.itemsLeftToWrite--;
+
+ assert check();
+ }
+
+ @Override
+ public void writeArrayEnd() throws IOException {
+ BlockedValue top = blockStack[stackTop];
+ if (top.type != Schema.Type.ARRAY) {
+ throw new AvroTypeException("Called writeArrayEnd outside of an array.");
+ }
+ if (top.itemsLeftToWrite != 0) {
+ throw new AvroTypeException("Failed to write expected number of array elements.");
+ }
+ endBlockedValue();
+
+ assert check();
+ }
+
+ @Override
+ public void writeMapStart() throws IOException {
+ if (stackTop + 1 == blockStack.length) {
+ expandStack();
+ }
+
+ BlockedValue bv = blockStack[++stackTop];
+ bv.type = Schema.Type.MAP;
+ bv.state = BlockedValue.State.REGULAR;
+ bv.start = bv.lastFullItem = pos;
+ bv.items = 0;
+
+ assert check();
+ }
+
+ @Override
+ public void writeMapEnd() throws IOException {
+ BlockedValue top = blockStack[stackTop];
+ if (top.type != Schema.Type.MAP) {
+ throw new AvroTypeException("Called writeMapEnd outside of a map.");
+ }
+ if (top.itemsLeftToWrite != 0) {
+ throw new AvroTypeException("Failed to read write expected number of array elements.");
+ }
+ endBlockedValue();
+
+ assert check();
+ }
+
+ @Override
+ public void writeIndex(int unionIndex) throws IOException {
+ if (pos + 5 > buf.length) {
+ ensure(5);
+ }
+ pos = encodeLong(unionIndex, buf, pos);
+
+ assert check();
+ }
+
+ private void endBlockedValue() throws IOException {
+ for (; ;) {
+ assert check();
+ BlockedValue t = blockStack[stackTop];
+ assert t.state != BlockedValue.State.ROOT;
+ if (t.state == BlockedValue.State.OVERFLOW) {
+ finishOverflow();
+ }
+ assert t.state == BlockedValue.State.REGULAR;
+ if (0 < t.items) {
+ int byteCount = pos - t.start;
+ if (t.start == 0 &&
+ blockStack[stackTop - 1].state
+ != BlockedValue.State.REGULAR) { // Lucky us -- don't have to move
+ encodeLong(-t.items, out);
+ encodeLong(byteCount, out);
+ } else {
+ encodeLong(-t.items, encoderBuffer);
+ encodeLong(byteCount, encoderBuffer);
+ final int headerSize = encoderBuffer.length();
+ if (buf.length >= pos + headerSize) {
+ pos += headerSize;
+ final int m = t.start;
+ System.arraycopy(buf, m, buf, m + headerSize, byteCount);
+ System.arraycopy(encoderBuffer.buffer(), 0, buf, m, headerSize);
+ encoderBuffer.reset();
+ } else {
+ encoderBuffer.reset();
+ compact();
+ continue;
+ }
+ }
+ }
+ stackTop--;
+ if (buf.length < (pos + 1)) ensure(1);
+ buf[pos++] = 0; // Sentinel for last block in a blocked value
+ assert check();
+ if (blockStack[stackTop].state == BlockedValue.State.ROOT) {
+ flush();
+ }
+ return;
+ }
+ }
+
+ /**
+ * Called when we've finished writing the last item in an overflow
+ * buffer. When this is finished, the top of the stack will be
+ * an empty block in the "regular" state.
+ * @throws IOException
+ */
+ private void finishOverflow() throws IOException {
+ BlockedValue s = blockStack[stackTop];
+ if (s.state != BlockedValue.State.OVERFLOW) {
+ throw new IllegalStateException("Not an overflow block");
+ }
+ assert check();
+
+ // Flush any remaining data for this block
+ out.write(buf, 0, pos);
+ pos = 0;
+
+ // Reset top of stack to be in REGULAR mode
+ s.state = BlockedValue.State.REGULAR;
+ s.start = s.lastFullItem = 0;
+ s.items = 0;
+ assert check();
+ }
+
+ private void ensure(int l) throws IOException {
+ if (buf.length < l) {
+ throw new IllegalArgumentException("Too big: " + l);
+ }
+ while (buf.length < (pos + l)) {
+ if (blockStack[stackTop].state == BlockedValue.State.REGULAR) {
+ compact();
+ } else {
+ out.write(buf, 0, pos);
+ pos = 0;
+ }
+ }
+ }
+
+ private void doWriteBytes(byte[] bytes, int start, int len)
+ throws IOException {
+ if (len < buf.length) {
+ ensure(len);
+ System.arraycopy(bytes, start, buf, pos, len);
+ pos += len;
+ } else {
+ ensure(buf.length);
+ assert blockStack[stackTop].state == BlockedValue.State.ROOT ||
+ blockStack[stackTop].state == BlockedValue.State.OVERFLOW;
+ write(bytes, start, len);
+ }
+ assert check();
+ }
+
+ private void write(byte b[], int off, int len) throws IOException {
+ if (blockStack[stackTop].state == BlockedValue.State.ROOT) {
+ out.write(b, off, len);
+ } else {
+ assert check();
+ while (buf.length < (pos + len)) {
+ if (blockStack[stackTop].state == BlockedValue.State.REGULAR) {
+ compact();
+ } else {
+ out.write(buf, 0, pos);
+ pos = 0;
+ if (buf.length <= len) {
+ out.write(b, off, len);
+ len = 0;
+ }
+ }
+ }
+ System.arraycopy(b, off, buf, pos, len);
+ pos += len;
+ assert check();
+ }
+ }
+
+ /** Only call if you're there are REGULAR-state values on the stack. */
+ private void compact() throws IOException {
+ assert check();
+
+ // Find first REGULAR-state value
+ BlockedValue s = null;
+ int i;
+ for (i = 1; i <= stackTop; i++) {
+ s = blockStack[i];
+ if (s.state == BlockedValue.State.REGULAR) break;
+ }
+ assert s != null;
+
+ // We're going to transition "s" into the overflow state. To do
+ // this, We're going to flush any bytes prior to "s", then write
+ // any full items of "s" into a block, start an overflow
+ // block, write any remaining bytes of "s" up to the start of the
+ // next more deeply-nested blocked-value, and finally move over
+ // any remaining bytes (which will be from more deeply-nested
+ // blocked values).
+
+ // Flush any bytes prios to "s"
+ out.write(buf, 0, s.start);
+
+ // Write any full items of "s"
+ if (1 < s.items) {
+ encodeLong(-(s.items - 1), out);
+ encodeLong(s.lastFullItem - s.start, out);
+ out.write(buf, s.start, s.lastFullItem - s.start);
+ s.start = s.lastFullItem;
+ s.items = 1;
+ }
+
+ // Start an overflow block for s
+ encodeLong(1, out);
+
+ // Write any remaining bytes for "s", up to the next-most
+ // deeply-nested value
+ BlockedValue n = ((i + 1) <= stackTop ?
+ blockStack[i + 1] : null);
+ int end = (n == null ? pos : n.start);
+ out.write(buf, s.lastFullItem, end - s.lastFullItem);
+
+ // Move over any bytes that remain (and adjust indices)
+ System.arraycopy(buf, end, buf, 0, pos - end);
+ for (int j = i + 1; j <= stackTop; j++) {
+ n = blockStack[j];
+ n.start -= end;
+ n.lastFullItem -= end;
+ }
+ pos -= end;
+
+ assert s.items == 1;
+ s.start = s.lastFullItem = 0;
+ s.state = BlockedValue.State.OVERFLOW;
+
+ assert check();
+ }
+}
Modified: hadoop/avro/trunk/src/java/org/apache/avro/io/ValueReader.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/io/ValueReader.java?rev=782092&r1=782091&r2=782092&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/io/ValueReader.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/io/ValueReader.java Fri Jun 5 18:31:45 2009
@@ -18,117 +18,454 @@
package org.apache.avro.io;
import java.io.EOFException;
-import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
+import org.apache.avro.AvroTypeException;
+import org.apache.avro.ipc.ByteBufferInputStream;
import org.apache.avro.util.Utf8;
-/** Read leaf values.
- * <p>Has no state except that of the OutputStream it wraps.
- * <p>Used by {@link DatumReader} implementations to read datum leaf values.
- * @see ValueWriter
+/**
+ * Low-level support for de-serializing Avro values.
+ *
+ * This class has two types of methods. One type of methods support
+ * the reading of leaf values (for example, {@link #readLong} and
+ * {@link #readString}).
+ *
+ * The other type of methods support the writing of maps and arrays.
+ * These methods are {@link #readArrayStart}, {@link #arrayNext},
+ * and similar methods for maps). See {@link #readArrayStart} for
+ * details on these methods.)
+ *
+ * @see ValueWriter
*/
-public class ValueReader extends FilterInputStream {
+
+public class ValueReader {
+ private InputStream in;
+
+ private class ByteReader {
+ public ByteBuffer read(ByteBuffer old, int length) throws IOException {
+ ByteBuffer result;
+ if (old != null && length <= old.capacity()) {
+ result = old;
+ result.clear();
+ } else {
+ result = ByteBuffer.allocate(length);
+ }
+ doReadBytes(result.array(), result.position(), length);
+ result.limit(length);
+ return result;
+ }
+ }
+
+ private class ReuseByteReader extends ByteReader {
+ private final ByteBufferInputStream bbi;
+
+ public ReuseByteReader(ByteBufferInputStream bbi) {
+ this.bbi = bbi;
+ }
+
+ @Override
+ public ByteBuffer read(ByteBuffer old, int length) throws IOException {
+ if (old != null) {
+ return super.read(old, length);
+ } else {
+ return bbi.readBuffer(length);
+ }
+ }
+
+ }
+
+ private final ByteReader byteReader;
+
public ValueReader(InputStream in) {
- super(in);
+ this.in = in;
+ byteReader = (in instanceof ByteBufferInputStream) ?
+ new ReuseByteReader((ByteBufferInputStream) in) : new ByteReader();
}
- /** Same contract as {@link InputStream#read()}, except that EOFException is
- * throw when EOF reached rather than returning -1.
- * @throws EOFException if at EOF. */
- public int read() throws IOException {
- int value = in.read();
- if (value < 0) throw new EOFException();
- return value;
- }
-
- /** Read a string written by {@link ValueWriter#writeUtf8(Utf8)}.
- * @throws EOFException if EOF is reached before reading all the bytes. */
- public Utf8 readUtf8(Object old) throws IOException {
- Utf8 utf8 = old instanceof Utf8 ? (Utf8)old : new Utf8();
- utf8.setLength((int)readLong());
- readBytes(utf8.getBytes(), 0, utf8.getLength());
- return utf8;
- }
- /** Read buffer written by {@link ValueWriter#writeBuffer(ByteBuffer)}.
- * @throws EOFException if EOF is reached before reading all the bytes. */
- public ByteBuffer readBuffer(Object old) throws IOException {
- int length = (int)readLong();
- ByteBuffer bytes;
- if ((old instanceof ByteBuffer) && ((ByteBuffer)old).capacity() >= length) {
- bytes = (ByteBuffer)old;
- bytes.clear();
- } else {
- bytes = ByteBuffer.allocate(length);
- }
- readBytes(bytes.array(), 0, length);
- bytes.limit(length);
- return bytes;
+ /** Start reading against a different input stream. Stateful
+ * subclasses will reset their states to their initial state. */
+ public void init(InputStream in) {
+ this.in = in;
+ }
+
+ /**
+ * "Reads" a null value. (Doesn't actually read anything, but
+ * advances the state of the parser if the implementation is
+ * stateful.)
+ * @throws AvroTypeException If this is a stateful reader and
+ * null is not the type of the next value to be read
+ */
+ public void readNull() throws IOException { }
+
+ /**
+ * Reads a boolean value written by {@link ValueWriter#writeBoolean}.
+ * @throws AvroTypeException If this is a stateful reader and
+ * boolean is not the type of the next value to be read
+ */
+
+ public boolean readBoolean() throws IOException {
+ int n = in.read();
+ if (n < 0) {
+ throw new EOFException();
+ }
+ return n == 1;
}
- /** Read an int written by {@link ValueWriter#writeInt(int)}.
- * @throws EOFException if EOF is reached before reading all the bytes.*/
+ /**
+ * Reads an integer written by {@link ValueWriter#writeInt}.
+ * @throws AvroTypeException If encoded value is larger than
+ * 32-bits
+ * @throws AvroTypeException If this is a stateful reader and
+ * int is not the type of the next value to be read
+ */
public int readInt() throws IOException {
- return (int)readLong();
+ long result = readLong();
+ if (result < Integer.MIN_VALUE || Integer.MAX_VALUE < result) {
+ throw new AvroTypeException("Integer overflow.");
+ }
+ return (int)result;
}
- /** Read a long written by {@link ValueWriter#writeLong(long)}.
- * @throws EOFException if EOF is reached before reading all the bytes. */
+ /**
+ * Reads a long written by {@link ValueWriter#writeLong}.
+ * @throws AvroTypeException If this is a stateful reader and
+ * long is not the type of the next value to be read
+ */
public long readLong() throws IOException {
- long b = read();
- long n = b & 0x7F;
- for (int shift = 7; (b & 0x80) != 0; shift += 7) {
- b = read();
- n |= (b & 0x7F) << shift;
+ long n = 0;
+ for (int shift = 0; ; shift += 7) {
+ long b = in.read();
+ if (b >= 0) {
+ n |= (b & 0x7F) << shift;
+ if ((b & 0x80) == 0) {
+ break;
+ }
+ } else {
+ throw new EOFException();
+ }
}
- return (n >>> 1) ^ -(n & 1); // back to two's-complement
+ return (n >>> 1) ^ -(n & 1); // back to two's-complement
}
- /** Read a float written by {@link ValueWriter#writeFloat(float)}.
- * @throws EOFException if EOF is reached before reading all the bytes. */
+ /**
+ * Reads a float written by {@link ValueWriter#writeFloat}.
+ * @throws AvroTypeException If this is a stateful reader and
+ * is not the type of the next value to be read
+ */
public float readFloat() throws IOException {
- return Float.intBitsToFloat(((read() & 0xff) ) |
- ((read() & 0xff) << 8) |
- ((read() & 0xff) << 16) |
- ((read() & 0xff) << 24));
+ int n = 0;
+ for (int i = 0, shift = 0; i < 4; i++, shift += 8) {
+ int k = in.read();
+ if (k >= 0) {
+ n |= (k & 0xff) << shift;
+ } else {
+ throw new EOFException();
+ }
+ }
+ return Float.intBitsToFloat(n);
}
- /** Read a double written by {@link ValueWriter#writeDouble(double)}.
- * @throws EOFException if EOF is reached before reading all the bytes. */
+ /**
+ * Reads a double written by {@link ValueWriter#writeDouble}.
+ * @throws AvroTypeException If this is a stateful reader and
+ * is not the type of the next value to be read
+ */
public double readDouble() throws IOException {
- return Double.longBitsToDouble(((read() & 0xffL) ) |
- ((read() & 0xffL) << 8) |
- ((read() & 0xffL) << 16) |
- ((read() & 0xffL) << 24) |
- ((read() & 0xffL) << 32) |
- ((read() & 0xffL) << 40) |
- ((read() & 0xffL) << 48) |
- ((read() & 0xffL) << 56));
+ long n = 0;
+ for (int i = 0, shift = 0; i < 8; i++, shift += 8) {
+ long k = in.read();
+ if (k >= 0) {
+ n |= (k & 0xff) << shift;
+ } else {
+ throw new EOFException();
+ }
+ }
+ return Double.longBitsToDouble(n);
}
-
- /** Read a boolean written by {@link ValueWriter#writeBoolean(boolean)}.
- * @throws EOFException if EOF is reached before reading all the bytes. */
- public boolean readBoolean() throws IOException {
- return read() == 1;
+
+ /**
+ * Reads a char-string written by {@link ValueWriter#writeString}.
+ * @throws AvroTypeException If this is a stateful reader and
+ * char-string is not the type of the next value to be read
+ */
+ public Utf8 readString(Utf8 old) throws IOException {
+ int length = readInt();
+ Utf8 result = (old != null ? old : new Utf8());
+ result.setLength(length);
+ doReadBytes(result.getBytes(), 0, length);
+ return result;
+ }
+
+ /**
+ * Discards a char-string written by {@link ValueWriter#writeString}.
+ * @throws AvroTypeException If this is a stateful reader and
+ * char-string is not the type of the next value to be read
+ */
+ public void skipString() throws IOException {
+ doSkipBytes(readInt());
+ }
+
+ /**
+ * Reads a byte-string written by {@link ValueWriter#writeBytes}.
+ * if <tt>old</tt> is not null and has sufficient capacity to take in
+ * the bytes being read, the bytes are returned in <tt>old</tt>.
+ * @throws AvroTypeException If this is a stateful reader and
+ * byte-string is not the type of the next value to be read
+ */
+ public ByteBuffer readBytes(ByteBuffer old) throws IOException {
+ int length = readInt();
+ return byteReader.read(old, length);
+ }
+
+ /**
+ * Discards a byte-string written by {@link ValueWriter#writeBytes}.
+ * @throws AvroTypeException If this is a stateful reader and
+ * byte-string is not the type of the next value to be read
+ */
+ public void skipBytes() throws IOException {
+ doSkipBytes(readInt());
+ }
+
+ /**
+ * Reads fixed sized binary object.
+ * @param bytes The buffer to store the contents being read.
+ * @param start The position where the data needs to be written.
+ * @param length The size of the binary object.
+ * @throws AvroTypeException If this is a stateful reader and
+ * fixed sized binary object is not the type of the next
+ * value to be read or the length is incorrect.
+ * @throws IOException
+ */
+ public void readFixed(byte[] bytes, int start, int length)
+ throws IOException {
+ doReadBytes(bytes, start, length);
}
- /** Read bytes into an array.
- * @throws EOFException if EOF is reached before reading all the bytes. */
- public void readBytes(byte[] buffer) throws IOException {
- readBytes(buffer, 0, buffer.length);
+ /**
+ * A shorthand for <tt>readFixed(bytes, 0, bytes.length)</tt>.
+ * @throws AvroTypeException If this is a stateful reader and
+ * fixed sized binary object is not the type of the next
+ * value to be read or the length is incorrect.
+ * @throws IOException
+ */
+ public void readFixed(byte[] bytes) throws IOException {
+ readFixed(bytes, 0, bytes.length);
+ }
+
+ /**
+ * Discards fixed sized binary object.
+ * @param length The size of the binary object to be skipped.
+ * @throws AvroTypeException If this is a stateful reader and
+ * fixed sized binary object is not the type of the next
+ * value to be read or the length is incorrect.
+ * @throws IOException
+ */
+ public void skipFixed(int length) throws IOException {
+ doSkipBytes(length);
+ }
+
+ /**
+ * Reads an enumeration.
+ * @return The enumeration's value.
+ * @throws AvroTypeException If this is a stateful reader and
+ * enumeration is not the type of the next value to be read.
+ * @throws IOException
+ */
+ public int readEnum() throws IOException {
+ return readInt();
+ }
+
+ private void doSkipBytes(long length) throws IOException, EOFException {
+ while (length > 0) {
+ long n = in.skip(length);
+ if (n <= 0) {
+ throw new EOFException();
+ }
+ length -= n;
+ }
}
- /** Read bytes into an array.
- * @throws EOFException if EOF is reached before reading all the bytes. */
- public void readBytes(byte[] buffer, int offset, int length)
+
+ /**
+ * Reads <tt>length</tt> bytes into <tt>bytes</tt> starting at
+ * <tt>start</tt>.
+ * @throws EOFException If there are not enough number of bytes in
+ * the stream.
+ * @throws IOException
+ */
+ private void doReadBytes(byte[] bytes, int start, int length)
throws IOException {
- int total = 0;
- while (total < length) {
- int n = read(buffer, offset+total, length-total);
+ while (length > 0) {
+ int n = in.read(bytes, start, length);
if (n < 0) throw new EOFException();
- total += n;
+ start += n;
+ length -= n;
+ }
+ }
+
+/**
+ * Returns the number of items to follow in the current array or map.
+ * Returns 0 if there are no more items in the current array and the array/map
+ * has ended.
+ * @return
+ * @throws IOException
+ */
+ private long doReadItemCount() throws IOException {
+ long result = readLong();
+ if (result < 0) {
+ readLong(); // Consume byte-count if present
+ result = -result;
}
+ return result;
}
+ /**
+ * Reads the count of items in the current array or map and skip those
+ * items, if possible. If it could skip the items, keep repeating until
+ * there are no more items left in the array or map. If items cannot be
+ * skipped (because byte count to skip is not found in the stream)
+ * return the count of the items found. The client needs to skip the
+ * items individually.
+ * @return Zero if there are no more items to skip and end of array/map
+ * is reached. Positive number if some items are found that cannot be
+ * skipped and the client needs to skip them individually.
+ * @throws IOException
+ */
+ private long doSkipItems() throws IOException {
+ long result = readInt();
+ while (result < 0) {
+ long bytecount = readLong();
+ doSkipBytes(bytecount);
+ result = readInt();
+ }
+ return result;
+ }
+
+ /**
+ * Reads and returns the size of the first block of an array. If
+ * this method returns non-zero, then the caller should read the
+ * indicated number of items, and then call {@link
+ * #arrayNext} to find out the number of items in the next
+ * block. The typical pattern for consuming an array looks like:
+ * <pre>
+ * for(long i = in.readArrayStart(); i != 0; i = in.arrayNext()) {
+ * for (long j = 0; j < i; j++) {
+ * read next element of the array;
+ * }
+ * }
+ * </pre>
+ * @throws AvroTypeException If this is a stateful reader and
+ * array is not the type of the next value to be read */
+ public long readArrayStart() throws IOException {
+ return doReadItemCount();
+ }
+
+ /**
+ * Processes the next block of an array andreturns the number of items in
+ * the block and let's the caller
+ * read those items.
+ * @throws AvroTypeException When called outside of an
+ * array context
+ */
+ public long arrayNext() throws IOException {
+ return doReadItemCount();
+ }
+
+ /**
+ * Used for quickly skipping through an array. Note you can
+ * either skip the entire array, or read the entire array (with
+ * {@link #readArrayStart}), but you can't mix the two on the
+ * same array.
+ *
+ * This method will skip through as many items as it can, all of
+ * them if possible. It will return zero if there are no more
+ * items to skip through, or an item count if it needs the client's
+ * help in skipping. The typical usage pattern is:
+ * <pre>
+ * for(long i = in.skipArray(); i != 0; i = i.skipArray()) {
+ * for (long j = 0; j < i; j++) {
+ * read and discard the next element of the array;
+ * }
+ * }
+ * </pre>
+ * Note that this method can automatically skip through items if a
+ * byte-count is found in the underlying data, or if a schema has
+ * been provided to the implementation, but
+ * otherwise the client will have to skip through items itself.
+ *
+ * @throws AvroTypeException If this is a stateful reader and
+ * array is not the type of the next value to be read
+ */
+ public long skipArray() throws IOException {
+ return doSkipItems();
+ }
+
+ /**
+ * Reads and returns the size of the next block of map-entries.
+ * Similar to {@link #readArrayStart}.
+ *
+ * As an example, let's say you want to read a map of records,
+ * the record consisting of an Long field and a Boolean field.
+ * Your code would look something like this:
+ * <pre>
+ * Map<String,Record> m = new HashMap<String,Record>();
+ * Record reuse = new Record();
+ * for(long i = in.readMapStart(); i != 0; i = in.readMapNext()) {
+ * for (long j = 0; j < i; j++) {
+ * String key = in.readString();
+ * reuse.intField = in.readInt();
+ * reuse.boolField = in.readBoolean();
+ * m.put(key, reuse);
+ * }
+ * }
+ * </pre>
+ * @throws AvroTypeException If this is a stateful reader and
+ * map is not the type of the next value to be read
+ */
+ public long readMapStart() throws IOException {
+ return doReadItemCount();
+ }
+
+ /**
+ * Processes the next block of map entries and returns the count of them.
+ * Similar to {@link #arrayNext}. See {@link #readMapStart} for details.
+ * @throws AvroTypeException When called outside of a
+ * map context
+ */
+ public long mapNext() throws IOException {
+ return doReadItemCount();
+ }
+
+ /**
+ * Support for quickly skipping through a map similar to {@link #skipArray}.
+ *
+ * As an example, let's say you want to skip a map of records,
+ * the record consisting of an Long field and a Boolean field.
+ * Your code would look something like this:
+ * <pre>
+ * for(long i = in.skipMap(); i != 0; i = in.skipMap()) {
+ * for (long j = 0; j < i; j++) {
+ * in.skipString(); // Discard key
+ * in.readInt(); // Discard int-field of value
+ * in.readBoolean(); // Discard boolean-field of value
+ * }
+ * }
+ * </pre>
+ * @throws AvroTypeException If this is a stateful reader and
+ * array is not the type of the next value to be read */
+
+ public long skipMap() throws IOException {
+ return doSkipItems();
+ }
+
+ /**
+ * Reads the tag of a union written by {@link ValueWriter#writeIndex}.
+ * @throws AvroTypeException If this is a stateful reader and
+ * union is not the type of the next value to be read
+ */
+ public int readIndex() throws IOException {
+ return readInt();
+ }
}
Modified: hadoop/avro/trunk/src/java/org/apache/avro/io/ValueWriter.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/io/ValueWriter.java?rev=782092&r1=782091&r2=782092&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/io/ValueWriter.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/io/ValueWriter.java Fri Jun 5 18:31:45 2009
@@ -17,75 +17,404 @@
*/
package org.apache.avro.io;
-import java.io.*;
+import java.io.IOException;
+import java.io.OutputStream;
import java.nio.ByteBuffer;
+
+import org.apache.avro.AvroTypeException;
+import org.apache.avro.ipc.ByteBufferOutputStream;
import org.apache.avro.util.Utf8;
-/** Write leaf values.
- * <p>Has no state except that of the OutputStream it wraps.
- * <p>Used by {@link DatumWriter} implementations to write datum leaf values.
- * @see ValueReader
+/**
+ * Low-level support for serializing Avro values.
+ *
+ * This class has two types of methods. One type of methods support
+ * the writing of leaf values (for example, {@link #writeLong} and
+ * {@link #writeString}). These methods have analogs in {@link
+ * ValueReader}.
+ *
+ * The other type of methods support the writing of maps and arrays.
+ * These methods are {@link #writeArrayStart}, {@link
+ * #startItem}, and {@link #writeArrayEnd} (and similar methods for
+ * maps). Some implementations of {@link ValueWriter} handle the
+ * buffering required to break large maps and arrays into blocks,
+ * which is necessary for applications that want to do streaming.
+ * (See {@link #writeArrayStart} for details on these methods.)
+ *
+ * @see ValueReader
*/
-public class ValueWriter extends FilterOutputStream {
+public class ValueWriter {
+ protected OutputStream out;
+
+ private interface ByteWriter {
+ public void write(ByteBuffer bytes) throws IOException;
+ }
+
+ private static final class SimpleByteWriter implements ByteWriter {
+ private final OutputStream out;
+
+ public SimpleByteWriter(OutputStream out) {
+ this.out = out;
+ }
+
+ @Override
+ public void write(ByteBuffer bytes) throws IOException {
+ encodeLong(bytes.remaining(), out);
+ out.write(bytes.array(), bytes.position(), bytes.remaining());
+ }
+ }
+
+ private static final class ReuseByteWriter implements ByteWriter {
+ private final ByteBufferOutputStream bbout;
+ public ReuseByteWriter(ByteBufferOutputStream bbout) {
+ this.bbout = bbout;
+ }
+
+ @Override
+ public void write(ByteBuffer bytes) throws IOException {
+ encodeLong(bytes.remaining(), bbout);
+ bbout.writeBuffer(bytes);
+ }
+ }
+
+ private final ByteWriter byteWriter;
+
+ /** Create a writer that sends its output to the underlying stream
+ * <code>out</code>. */
public ValueWriter(OutputStream out) {
- super(out);
+ this.out = out;
+ this.byteWriter = (out instanceof ByteBufferOutputStream) ?
+ new ReuseByteWriter((ByteBufferOutputStream) out) :
+ new SimpleByteWriter(out);
}
- /** Write a string as {@link #writeLong(long)}-prefixed UTF-8. */
- public void writeUtf8(Utf8 utf8) throws IOException {
- writeLong(utf8.getLength());
- out.write(utf8.getBytes(), 0, utf8.getLength());
+
+ /** Redirect output (and reset the parser state if we're checking). */
+ public void init(OutputStream out) throws IOException {
+ flush();
+ this.out = out;
+ }
+
+ /**
+ * Writes any buffered output to the underlying stream.
+ */
+ public void flush() throws IOException {
+ out.flush();
}
- /** Write a buffer of bytes. */
- public void writeBuffer(ByteBuffer bytes) throws IOException {
- writeLong(bytes.remaining());
- out.write(bytes.array(), bytes.position(), bytes.remaining());
- }
- /** Write an int using 1-5 bytes. The sign is moved to the low-order bit,
- * and then the value is written so that the high-order bit of each byte
- * indicates whether more bytes remain. */
- public void writeInt(int n) throws IOException { writeLong(n); }
-
- /** Write a long using 1-10 bytes. The sign is moved to the low-order bit,
- * and then the value is written so that the high-order bit of each byte
- * indicates whether more bytes remain. */
+
+ /**
+ * "Writes" a null value. (Doesn't actually write anything, but
+ * advances the state of the parser if this class is stateful.)
+ * @throws AvroTypeException If this is a stateful writer and a
+ * null is not expected
+ */
+ public void writeNull() throws IOException { }
+
+ /**
+ * Write a boolean value.
+ * @throws AvroTypeException If this is a stateful writer and a
+ * boolean is not expected
+ */
+ public void writeBoolean(boolean b) throws IOException {
+ out.write(b ? 1 : 0);
+ }
+
+ /**
+ * Writes a 32-bit integer.
+ * @throws AvroTypeException If this is a stateful writer and an
+ * integer is not expected
+ */
+ public void writeInt(int n) throws IOException {
+ encodeLong(n, out);
+ }
+
+ /**
+ * Write a 64-bit integer.
+ * @throws AvroTypeException If this is a stateful writer and a
+ * long is not expected
+ */
public void writeLong(long n) throws IOException {
- n = (n << 1) ^ (n >> 63); // move sign to low-order bit
+ encodeLong(n, out);
+ }
+
+ /** Write a float.
+ * @throws IOException
+ * @throws AvroTypeException If this is a stateful writer and a
+ * float is not expected
+ */
+ public void writeFloat(float f) throws IOException {
+ encodeFloat(f, out);
+ }
+
+ /**
+ * Write a double.
+ * @throws AvroTypeException If this is a stateful writer and a
+ * double is not expected
+ */
+ public void writeDouble(double d) throws IOException {
+ encodeDouble(d, out);
+ }
+
+ /**
+ * Write a Unicode character string.
+ * @throws AvroTypeException If this is a stateful writer and a
+ * char-string is not expected
+ */
+ public void writeString(Utf8 utf8) throws IOException {
+ encodeLong(utf8.getLength(), out);
+ out.write(utf8.getBytes(), 0, utf8.getLength());
+ }
+
+ /**
+ * Write a Unicode character string.
+ * @throws AvroTypeException If this is a stateful writer and a
+ * char-string is not expected
+ */
+ public void writeString(String str) throws IOException {
+ writeString(new Utf8(str));
+ }
+
+ /**
+ * Write a byte string.
+ * @throws AvroTypeException If this is a stateful writer and a
+ * byte-string is not expected
+ */
+ public void writeBytes(ByteBuffer bytes) throws IOException {
+ byteWriter.write(bytes);
+ }
+
+ /**
+ * Write a byte string.
+ * @throws AvroTypeException If this is a stateful writer and a
+ * byte-string is not expected
+ */
+ public void writeBytes(byte[] bytes, int start, int len) throws IOException {
+ encodeLong(len, out);
+ out.write(bytes, start, len);
+ }
+
+ /**
+ * Writes a byte string.
+ * Equivalent to <tt>writeBytes(bytes, 0, bytes.length)</tt>
+ * @throws IOException
+ * @throws AvroTypeException If this is a stateful writer and a
+ * byte-string is not expected
+ */
+ public void writeBytes(byte[] bytes) throws IOException {
+ writeBytes(bytes, 0, bytes.length);
+ }
+
+ /**
+ * Writes a fixed size binary object.
+ * @param bytes The contents to write
+ * @param start The position within <tt>bytes</tt> where the contents
+ * start.
+ * @param len The number of bytes to write.
+ * @throws AvroTypeException If this is a stateful writer and a
+ * byte-string is not expected
+ * @throws IOException
+ */
+ public void writeFixed(byte[] bytes, int start, int len) throws IOException {
+ out.write(bytes, start, len);
+ }
+
+ /**
+ * A shorthand for <tt>writeFixed(bytes, 0, bytes.length)</tt>
+ * @param bytes
+ */
+ public void writeFixed(byte[] bytes) throws IOException {
+ writeFixed(bytes, 0, bytes.length);
+ }
+
+ /**
+ * Writes an enumeration.
+ * @param e
+ * @throws AvroTypeException If this is a stateful writer and an enumeration
+ * is not expected or the <tt>e</tt> is out of range.
+ * @throws IOException
+ */
+ public void writeEnum(int e) throws IOException {
+ encodeLong(e, out);
+ }
+
+ /** Call this method to start writing an array.
+ *
+ * When starting to serialize an array, call {@link
+ * #writeArrayStart}. Then, before writing any data for any item
+ * call {@link #setItemCount} followed by a sequence of
+ * {@link #startItem()} and the item itself. The number of
+ * {@link #startItem()} should match the number specified in
+ * {@link #setItemCount}.
+ * When actually writing the data of the item, you can call any {@link
+ * ValueWriter} method (e.g., {@link #writeLong}). When all items
+ * of the array have been written, call {@link #writeArrayEnd}.
+ *
+ * As an example, let's say you want to write an array of records,
+ * the record consisting of an Long field and a Boolean field.
+ * Your code would look something like this:
+ * <pre>
+ * out.writeArrayStart();
+ * out.setItemCount(list.size());
+ * for (Record r : list) {
+ * out.startItem();
+ * out.writeLong(r.longField);
+ * out.writeBoolean(r.boolField);
+ * }
+ * out.writeArrayEnd();
+ * </pre>
+ * @throws AvroTypeException If this is a stateful writer and an
+ * array is not expected
+ */
+ public void writeArrayStart() throws IOException {
+ }
+
+ /**
+ * Call this method before writing a batch of items in an array or a map.
+ * Then for each item, call {@link #startItem()} followed by any of the
+ * other write methods of {@link ValueWriter}. The number of calls
+ * to {@link #startItem()} must be equal to the count specified
+ * in {@link #setItemCount}. Once a batch is completed you
+ * can start another batch with {@link #setItemCount}.
+ *
+ * @param itemCount The number of {@link #startItem()} calls to follow.
+ * @throws IOException
+ */
+ public void setItemCount(long itemCount) throws IOException {
+ if (itemCount > 0) {
+ writeLong(itemCount);
+ }
+ }
+
+ /**
+ * Start a new item of an array or map.
+ * See {@link #writeArrayStart} for usage information.
+ * @throws AvroTypeException If called outside of an array or map context
+ */
+ public void startItem() throws IOException {
+ }
+
+ /**
+ * Call this method to finish writing an array.
+ * See {@link #writeArrayStart} for usage information.
+ *
+ * @throws AvroTypeException If items written does not match count
+ * provided to {@link #writeArrayStart}
+ * @throws AvroTypeException If not currently inside an array
+ */
+ public void writeArrayEnd() throws IOException {
+ encodeLong(0, out);
+ }
+
+ /**
+ * Call this to start a new map. See
+ * {@link #writeArrayStart} for details on usage.
+ *
+ * As an example of usage, let's say you want to write a map of
+ * records, the record consisting of an Long field and a Boolean
+ * field. Your code would look something like this:
+ * <pre>
+ * out.writeMapStart();
+ * out.setItemCount(list.size());
+ * for (Map.Entry<String,Record> entry : map.entrySet()) {
+ * out.startItem();
+ * out.writeString(entry.getKey());
+ * out.writeLong(entry.getValue().longField);
+ * out.writeBoolean(entry.getValue().boolField);
+ * }
+ * out.writeMapEnd();
+ * </pre>
+ * @throws AvroTypeException If this is a stateful writer and a
+ * map is not expected
+ */
+ public void writeMapStart() throws IOException {
+ }
+
+ /**
+ * Call this method to terminate the inner-most, currently-opened
+ * map. See {@link #writeArrayStart} for more details.
+ *
+ * @throws AvroTypeException If items written does not match count
+ * provided to {@link #writeMapStart}
+ * @throws AvroTypeException If not currently inside a map
+ */
+ public void writeMapEnd() throws IOException {
+ encodeLong(0, out);
+ }
+
+ /** Call this method to write the tag of a union.
+ *
+ * As an example of usage, let's say you want to write a union,
+ * whose second branch is a record consisting of an Long field and
+ * a Boolean field. Your code would look something like this:
+ * <pre>
+ * out.writeIndex(1);
+ * out.writeLong(record.longField);
+ * out.writeBoolean(record.boolField);
+ * </pre>
+ * @throws AvroTypeException If this is a stateful writer and a
+ * map is not expected
+ */
+ public void writeIndex(int unionIndex) throws IOException {
+ encodeLong(unionIndex, out);
+ }
+
+ protected static void encodeLong(long n, OutputStream o) throws IOException {
+ n = (n << 1) ^ (n >> 63); // move sign to low-order bit
+ while ((n & ~0x7F) != 0) {
+ o.write((byte)((n & 0x7f) | 0x80));
+ n >>>= 7;
+ }
+ o.write((byte)n);
+ }
+
+ protected static int encodeLong(long n, byte[] b, int pos) {
+ n = (n << 1) ^ (n >> 63); // move sign to low-order bit
while ((n & ~0x7F) != 0) {
- out.write((byte)((n & 0x7f) | 0x80));
+ b[pos++] = (byte)((n & 0x7f) | 0x80);
n >>>= 7;
}
- out.write((byte)n);
+ b[pos++] = (byte) n;
+ return pos;
}
- /** Writes a float as eight bytes. */
- public void writeFloat(float n) throws IOException {
- int bits = Float.floatToRawIntBits(n);
- out.write((int)(bits ) & 0xFF);
- out.write((int)(bits >> 8) & 0xFF);
- out.write((int)(bits >> 16) & 0xFF);
- out.write((int)(bits >> 24) & 0xFF);
- }
-
- /** Writes a double as eight bytes. */
- public void writeDouble(double n) throws IOException {
- long bits = Double.doubleToRawLongBits(n);
- out.write((int)(bits ) & 0xFF);
- out.write((int)(bits >> 8) & 0xFF);
- out.write((int)(bits >> 16) & 0xFF);
- out.write((int)(bits >> 24) & 0xFF);
- out.write((int)(bits >> 32) & 0xFF);
- out.write((int)(bits >> 40) & 0xFF);
- out.write((int)(bits >> 48) & 0xFF);
- out.write((int)(bits >> 56) & 0xFF);
+ protected static void encodeFloat(float f, OutputStream o) throws IOException {
+ long bits = Float.floatToRawIntBits(f);
+ o.write((int)(bits ) & 0xFF);
+ o.write((int)(bits >> 8) & 0xFF);
+ o.write((int)(bits >> 16) & 0xFF);
+ o.write((int)(bits >> 24) & 0xFF);
}
-
- /** Writes a boolean as a single byte. */
- public void writeBoolean(boolean b) throws IOException {
- out.write(b ? 1 : 0);
+
+ protected static int encodeFloat(float f, byte[] b, int pos) {
+ long bits = Float.floatToRawIntBits(f);
+ b[pos++] = (byte)((bits ) & 0xFF);
+ b[pos++] = (byte)((bits >> 8) & 0xFF);
+ b[pos++] = (byte)((bits >> 16) & 0xFF);
+ b[pos++] = (byte)((bits >> 24) & 0xFF);
+ return pos;
}
- public void write(byte b[], int off, int len) throws IOException {
- out.write(b, off, len);
+ protected static void encodeDouble(double d, OutputStream o) throws IOException {
+ long bits = Double.doubleToRawLongBits(d);
+ o.write((int)(bits ) & 0xFF);
+ o.write((int)(bits >> 8) & 0xFF);
+ o.write((int)(bits >> 16) & 0xFF);
+ o.write((int)(bits >> 24) & 0xFF);
+ o.write((int)(bits >> 32) & 0xFF);
+ o.write((int)(bits >> 40) & 0xFF);
+ o.write((int)(bits >> 48) & 0xFF);
+ o.write((int)(bits >> 56) & 0xFF);
}
+ protected static int encodeDouble(double d, byte[] b, int pos) {
+ long bits = Double.doubleToRawLongBits(d);
+ b[pos++] = (byte)((bits ) & 0xFF);
+ b[pos++] = (byte)((bits >> 8) & 0xFF);
+ b[pos++] = (byte)((bits >> 16) & 0xFF);
+ b[pos++] = (byte)((bits >> 24) & 0xFF);
+ b[pos++] = (byte)((bits >> 32) & 0xFF);
+ b[pos++] = (byte)((bits >> 40) & 0xFF);
+ b[pos++] = (byte)((bits >> 48) & 0xFF);
+ b[pos++] = (byte)((bits >> 56) & 0xFF);
+ return pos;
+ }
}
Modified: hadoop/avro/trunk/src/java/org/apache/avro/ipc/ByteBufferInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/ByteBufferInputStream.java?rev=782092&r1=782091&r2=782092&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/ByteBufferInputStream.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/ByteBufferInputStream.java Fri Jun 5 18:31:45 2009
@@ -25,7 +25,7 @@
import java.util.List;
/** Utility to present {@link ByteBuffer} data as an {@link InputStream}.*/
-class ByteBufferInputStream extends InputStream {
+public class ByteBufferInputStream extends InputStream {
private List<ByteBuffer> buffers;
private int current;
Modified: hadoop/avro/trunk/src/java/org/apache/avro/ipc/ByteBufferOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/ByteBufferOutputStream.java?rev=782092&r1=782091&r2=782092&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/ByteBufferOutputStream.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/ByteBufferOutputStream.java Fri Jun 5 18:31:45 2009
@@ -24,7 +24,7 @@
/** Utility to collect data written to an {@link OutputStream} in {@link
* ByteBuffer}s.*/
-class ByteBufferOutputStream extends OutputStream {
+public class ByteBufferOutputStream extends OutputStream {
public static final int BUFFER_SIZE = 8192;
private List<ByteBuffer> buffers;
Modified: hadoop/avro/trunk/src/java/org/apache/avro/ipc/ByteBufferValueReader.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/ByteBufferValueReader.java?rev=782092&r1=782091&r2=782092&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/ByteBufferValueReader.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/ByteBufferValueReader.java Fri Jun 5 18:31:45 2009
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.avro.ipc;
-
-import java.io.*;
-import java.nio.ByteBuffer;
-import java.util.*;
-
-import org.apache.avro.io.ValueReader;
-
-/** A {@link ValueReader} that reads from {@link ByteBuffer}s.*/
-class ByteBufferValueReader extends ValueReader {
- public ByteBufferValueReader(List<ByteBuffer> buffers) {
- super(new ByteBufferInputStream(buffers));
- }
-
- @Override
- public ByteBuffer readBuffer(Object old) throws IOException {
- if (old != null) // punt
- return super.readBuffer(old);
- return ((ByteBufferInputStream)in).readBuffer((int)readLong());
- }
-
-
-}
Modified: hadoop/avro/trunk/src/java/org/apache/avro/ipc/ByteBufferValueWriter.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/ByteBufferValueWriter.java?rev=782092&r1=782091&r2=782092&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/ByteBufferValueWriter.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/ByteBufferValueWriter.java Fri Jun 5 18:31:45 2009
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.avro.ipc;
-
-import java.io.*;
-import java.nio.ByteBuffer;
-import java.util.*;
-
-import org.apache.avro.io.ValueWriter;
-
-/** A {@link ValueWriter} that writes to {@link ByteBuffer}s.*/
-class ByteBufferValueWriter extends ValueWriter {
- public ByteBufferValueWriter() {
- super(new ByteBufferOutputStream());
- }
-
- @Override
- public void writeBuffer(ByteBuffer buffer) throws IOException {
- writeLong(buffer.remaining());
- ((ByteBufferOutputStream)out).writeBuffer(buffer);
- }
-
- /** Return the list of {@link ByteBuffer}s collected. */
- public List<ByteBuffer> getBufferList() {
- return ((ByteBufferOutputStream)out).getBufferList();
- }
-
-}
Modified: hadoop/avro/trunk/src/java/org/apache/avro/ipc/Requestor.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/Requestor.java?rev=782092&r1=782091&r2=782092&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/Requestor.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/Requestor.java Fri Jun 5 18:31:45 2009
@@ -57,7 +57,8 @@
ValueReader in;
Message m;
do {
- ByteBufferValueWriter out = new ByteBufferValueWriter();
+ ByteBufferOutputStream bbo = new ByteBufferOutputStream();
+ ValueWriter out = new ValueWriter(bbo);
if (!established) // if not established
writeHandshake(out); // prepend handshake
@@ -67,13 +68,14 @@
if (m == null)
throw new AvroRuntimeException("Not a local message: "+messageName);
- out.writeUtf8(new Utf8(m.getName())); // write message name
+ out.writeString(m.getName()); // write message name
writeRequest(m.getRequest(), request, out); // write request payload
List<ByteBuffer> response = // transceive
- getTransceiver().transceive(out.getBufferList());
+ getTransceiver().transceive(bbo.getBufferList());
- in = new ByteBufferValueReader(response);
+ ByteBufferInputStream bbi = new ByteBufferInputStream(response);
+ in = new ValueReader(bbi);
if (!established) // if not established
readHandshake(in); // process handshake
} while (!established);
Modified: hadoop/avro/trunk/src/java/org/apache/avro/ipc/Responder.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/Responder.java?rev=782092&r1=782091&r2=782092&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/Responder.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/Responder.java Fri Jun 5 18:31:45 2009
@@ -55,16 +55,21 @@
/** Called by a server to deserialize a request, compute and serialize
* a response or error. */
public List<ByteBuffer> respond(Transceiver transceiver) throws IOException {
- ValueReader in = new ByteBufferValueReader(transceiver.readBuffers());
- ByteBufferValueWriter out = new ByteBufferValueWriter();
+ ByteBufferInputStream bbi =
+ new ByteBufferInputStream(transceiver.readBuffers());
+
+ ValueReader in = new ValueReader(bbi);
+ ByteBufferOutputStream bbo =
+ new ByteBufferOutputStream();
+ ValueWriter out = new ValueWriter(bbo);
AvroRemoteException error = null;
try {
Protocol remote = handshake(transceiver, in, out);
if (remote == null) // handshake failed
- return out.getBufferList();
+ return bbo.getBufferList();
// read request using remote protocol specification
- String messageName = in.readUtf8(null).toString();
+ String messageName = in.readString(null).toString();
Message m = remote.getMessages().get(messageName);
if (m == null)
throw new AvroRuntimeException("No such remote message: "+messageName);
@@ -94,12 +99,13 @@
} catch (AvroRuntimeException e) { // system error
LOG.warn("system error", e);
error = new AvroRemoteException(e);
- out = new ByteBufferValueWriter();
+ bbo = new ByteBufferOutputStream();
+ out = new ValueWriter(bbo);
out.writeBoolean(true);
writeError(Protocol.SYSTEM_ERRORS, error, out);
}
- return out.getBufferList();
+ return bbo.getBufferList();
}
private SpecificDatumWriter handshakeWriter =
Modified: hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFile.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFile.java?rev=782092&r1=782091&r2=782092&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFile.java (original)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFile.java Fri Jun 5 18:31:45 2009
@@ -66,7 +66,7 @@
}
}
- @Test
+ @Test(dependsOnMethods="testGenericWrite")
public void testGenericRead() throws IOException {
DataFileReader<Object> reader =
new DataFileReader<Object>(new SeekableFileInput(FILE),
Modified: hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolGeneric.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolGeneric.java?rev=782092&r1=782091&r2=782092&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolGeneric.java (original)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolGeneric.java Fri Jun 5 18:31:45 2009
@@ -135,9 +135,9 @@
new GenericData.Record(PROTOCOL.getMessages().get("echoBytes").getRequest());
ByteBuffer data = ByteBuffer.allocate(length);
random.nextBytes(data.array());
+ data.flip();
params.put("data", data);
Object echoed = requestor.request("echoBytes", params);
- data.flip();
assertEquals(data, echoed);
}
Modified: hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolSpecific.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolSpecific.java?rev=782092&r1=782091&r2=782092&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolSpecific.java (original)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolSpecific.java Fri Jun 5 18:31:45 2009
@@ -98,8 +98,8 @@
int length = random.nextInt(1024*16);
ByteBuffer data = ByteBuffer.allocate(length);
random.nextBytes(data.array());
- ByteBuffer echoed = proxy.echoBytes(data);
data.flip();
+ ByteBuffer echoed = proxy.echoBytes(data);
assertEquals(data, echoed);
}