You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2009/06/05 20:31:45 UTC

svn commit: r782092 [1/2] - in /hadoop/avro/trunk: ./ src/java/org/apache/avro/file/ src/java/org/apache/avro/generic/ src/java/org/apache/avro/io/ src/java/org/apache/avro/ipc/ src/test/java/org/apache/avro/ src/test/java/org/apache/avro/io/

Author: cutting
Date: Fri Jun  5 18:31:45 2009
New Revision: 782092

URL: http://svn.apache.org/viewvc?rev=782092&view=rev
Log:
AVRO-25.  Add blocking value writer.  Contributed by Thiruvalluvan M. G.

Added:
    hadoop/avro/trunk/src/java/org/apache/avro/io/BlockingValueWriter.java
    hadoop/avro/trunk/src/test/java/org/apache/avro/io/TestBlockingIO.java
Modified:
    hadoop/avro/trunk/CHANGES.txt
    hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileReader.java
    hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileWriter.java
    hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumReader.java
    hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumWriter.java
    hadoop/avro/trunk/src/java/org/apache/avro/io/ValueReader.java
    hadoop/avro/trunk/src/java/org/apache/avro/io/ValueWriter.java
    hadoop/avro/trunk/src/java/org/apache/avro/ipc/ByteBufferInputStream.java
    hadoop/avro/trunk/src/java/org/apache/avro/ipc/ByteBufferOutputStream.java
    hadoop/avro/trunk/src/java/org/apache/avro/ipc/ByteBufferValueReader.java
    hadoop/avro/trunk/src/java/org/apache/avro/ipc/ByteBufferValueWriter.java
    hadoop/avro/trunk/src/java/org/apache/avro/ipc/Requestor.java
    hadoop/avro/trunk/src/java/org/apache/avro/ipc/Responder.java
    hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFile.java
    hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolGeneric.java
    hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolSpecific.java
    hadoop/avro/trunk/src/test/java/org/apache/avro/io/TestValueReader.java

Modified: hadoop/avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=782092&r1=782091&r2=782092&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Fri Jun  5 18:31:45 2009
@@ -29,6 +29,10 @@
 
     AVRO-42.  Add partial C++ implementation. (Scott Banachowski via cutting)
 
+    AVRO-25.  Add blocking value writer that permits arbitrarily long
+    arrays and maps to be efficiently written as sequences of blocks.
+    (Thiruvalluvan M. G. via cutting)
+
   IMPROVEMENTS
 
     AVRO-11.  Re-implement specific and reflect datum readers and

Modified: hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileReader.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileReader.java?rev=782092&r1=782091&r2=782092&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileReader.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileReader.java Fri Jun  5 18:31:45 2009
@@ -19,6 +19,7 @@
 
 import java.io.*;
 import java.util.*;
+import java.nio.ByteBuffer;
 import java.rmi.server.UID;
 import java.security.MessageDigest;
 
@@ -47,10 +48,9 @@
   public DataFileReader(SeekableInput sin, DatumReader<D> reader)
     throws IOException {
     this.in = new SeekableBufferedInput(sin);
-    this.vin = new ValueReader(in);
 
     byte[] magic = new byte[4];
-    vin.readBytes(magic);
+    in.read(magic);
     if (!Arrays.equals(DataFileWriter.MAGIC, magic))
       throw new IOException("Not a data file.");
 
@@ -58,12 +58,18 @@
     in.seek(length-4);
     int footerSize=(in.read()<<24)+(in.read()<<16)+(in.read()<<8)+in.read();
     in.seek(length-footerSize);
-    int metaLength = (int)vin.readLong();
-    for (int i = 0; i < metaLength; i++) {        // read meta
-      String key = vin.readUtf8(null).toString();
-      byte[] value = new byte[(int)vin.readLong()];
-      vin.readBytes(value);
-      meta.put(key, value);
+    this.vin = new ValueReader(in);
+    long l = vin.readMapStart();
+    if (l > 0) {
+      do {
+        for (long i = 0; i < l; i++) {
+          String key = vin.readString(null).toString();
+          ByteBuffer value = vin.readBytes(null);
+          byte[] bb = new byte[value.remaining()];
+          value.get(bb);
+          meta.put(key, bb);
+        }
+      } while ((l = vin.mapNext()) != 0);
     }
 
     this.sync = getMeta("sync");
@@ -114,7 +120,7 @@
   }
 
   private void skipSync() throws IOException {
-    vin.readBytes(syncBuffer);
+    vin.readFixed(syncBuffer);
     if (!Arrays.equals(syncBuffer, sync))
       throw new IOException("Invalid sync!");
   }
@@ -133,7 +139,7 @@
       return;
     }
     in.seek(position);
-    vin.readBytes(syncBuffer);
+    vin.readFixed(syncBuffer);
     for (int i = 0; in.tell() < in.length(); i++) {
       int j = 0;
       for (; j < sync.length; j++) {

Modified: hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileWriter.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileWriter.java?rev=782092&r1=782091&r2=782092&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileWriter.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileWriter.java Fri Jun  5 18:31:45 2009
@@ -145,12 +145,15 @@
   private void writeFooter() throws IOException {
     writeBlock();                               // flush any data
     setMeta("count", count);                    // update count
-    bufOut.writeLong(meta.size());              // write meta entries
+    bufOut.writeMapStart();              // write meta entries
+    bufOut.setItemCount(meta.size());
     for (Map.Entry<String,byte[]> entry : meta.entrySet()) {
-      bufOut.writeUtf8(new Utf8(entry.getKey()));
-      bufOut.writeLong(entry.getValue().length);
-      bufOut.write(entry.getValue());
+      bufOut.startItem();
+      bufOut.writeString(entry.getKey());
+      bufOut.writeBytes(entry.getValue());
     }
+    bufOut.writeMapEnd();
+    
     int size = buffer.size()+4;
     out.write(sync);
     vout.writeLong(FOOTER_BLOCK);                 // tag the block

Modified: hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumReader.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumReader.java?rev=782092&r1=782091&r2=782092&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumReader.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumReader.java Fri Jun  5 18:31:45 2009
@@ -62,7 +62,7 @@
   protected Object read(Object old, Schema actual,
                         Schema expected, ValueReader in) throws IOException {
     if (actual.getType() == Type.UNION)           // resolve unions
-      actual = actual.getTypes().get((int)in.readLong());
+      actual = actual.getTypes().get((int)in.readIndex());
     if (expected.getType() == Type.UNION)
       expected = resolveExpected(actual, expected);
     switch (actual.getType()) {
@@ -223,7 +223,7 @@
       for (Iterator<String> i = json.getFieldNames(); i.hasNext();) {
         String key = i.next();
         addToMap(map, new Utf8(key),
-                 defaultFieldValue(null, value, json.getFieldValue(key)));
+                 defaultFieldValue(null, value, json.get(key)));
       }
       return map;
     case UNION:   return defaultFieldValue(old, schema.getTypes().get(0), json);
@@ -247,7 +247,7 @@
     String name = expected.getName();
     if (name != null && !name.equals(actual.getName()))
       throw new AvroTypeException("Expected "+expected+", found "+actual);
-    return createEnum(actual.getEnumSymbols().get(in.readInt()), expected);
+    return createEnum(actual.getEnumSymbols().get(in.readEnum()), expected);
   }
 
   /** Called to create an enum value. May be overridden for alternate enum
@@ -256,17 +256,23 @@
 
   /** Called to read an array instance.  May be overridden for alternate array
    * representations.*/
-  @SuppressWarnings(value="unchecked")
   protected Object readArray(Object old, Schema actual, Schema expected,
                              ValueReader in) throws IOException {
     Schema actualType = actual.getElementType();
     Schema expectedType = expected.getElementType();
-    long firstBlockSize = in.readLong();
-    Object array = newArray(old, (int) firstBlockSize);
-    for (long l = firstBlockSize; l > 0; l = in.readLong())
-      for (long i = 0; i < l; i++)
-        addToArray(array, read(peekArray(array), actualType, expectedType, in));
-    return array;
+    long l = in.readArrayStart();
+    if (l > 0) {
+      Object array = newArray(old, (int) l);
+      do {
+        for (long i = 0; i < l; i++) {
+          addToArray(array, read(peekArray(array), actualType, expectedType, in));  
+        }
+      } while ((l = in.arrayNext()) > 0);
+      
+      return array;
+    } else {
+      return newArray(old, 0);
+    }
   }
 
   /** Called by the default implementation of {@link #readArray} to retrieve a
@@ -286,18 +292,21 @@
   
   /** Called to read a map instance.  May be overridden for alternate map
    * representations.*/
-  @SuppressWarnings(value="unchecked")
   protected Object readMap(Object old, Schema actual, Schema expected,
                            ValueReader in) throws IOException {
     Schema aValue = actual.getValueType();
     Schema eValue = expected.getValueType();
-    int firstBlockSize = (int)in.readLong();
-    Object map = newMap(old, firstBlockSize);
-    for (long l = firstBlockSize; l > 0; l = in.readLong())
-      for (long i = 0; i < l; i++)
-        addToMap(map,
-                 readString(null, in),
-                 read(null, aValue, eValue, in));
+    long l = in.readMapStart();
+    Object map = newMap(old, (int) l);
+    if (l > 0) {
+      do {
+        for (int i = 0; i < l; i++) {
+          addToMap(map,
+              readString(null, in),
+              read(null, aValue, eValue, in));
+        }
+      } while ((l = in.mapNext()) > 0);
+    }
     return map;
   }
 
@@ -316,7 +325,7 @@
     if (!actual.equals(expected))
       throw new AvroTypeException("Expected "+expected+", found "+actual);
     GenericFixed fixed = (GenericFixed)createFixed(old, expected);
-    in.readBytes(fixed.bytes(), 0, actual.getFixedSize());
+    in.readFixed(fixed.bytes(), 0, actual.getFixedSize());
     return fixed;
   }
 
@@ -356,6 +365,7 @@
   /** Called to create new array instances.  Subclasses may override to use a
    * different array implementation.  By default, this returns a {@link
    * GenericData.Array}.*/
+  @SuppressWarnings("unchecked")
   protected Object newArray(Object old, int size) {
     if (old instanceof GenericArray) {
       ((GenericArray) old).clear();
@@ -366,6 +376,7 @@
   /** Called to create new array instances.  Subclasses may override to use a
    * different map implementation.  By default, this returns a {@link
    * HashMap}.*/
+  @SuppressWarnings("unchecked")
   protected Object newMap(Object old, int size) {
     if (old instanceof Map) {
       ((Map) old).clear();
@@ -375,9 +386,9 @@
 
   /** Called to read strings.  Subclasses may override to use a different
    * string representation.  By default, this calls {@link
-   * ValueReader#readUtf8(Object)}.*/
+   * ValueReader#readString(Utf8)}.*/
   protected Object readString(Object old, ValueReader in) throws IOException {
-    return in.readUtf8(old);
+    return in.readString((Utf8)old);
   }
 
   /** Called to create a string from a default value.  Subclasses may override
@@ -387,9 +398,9 @@
 
   /** Called to read byte arrays.  Subclasses may override to use a different
    * byte array representation.  By default, this calls {@link
-   * ValueReader#readBuffer(Object)}.*/
+   * ValueReader#readBytes(ByteBuffer)}.*/
   protected Object readBytes(Object old, ValueReader in) throws IOException {
-    return in.readBuffer(old);
+    return in.readBytes((ByteBuffer)old);
   }
 
   /** Called to create byte arrays from default values.  Subclasses may
@@ -411,29 +422,30 @@
       break;
     case ARRAY:
       Schema elementType = schema.getElementType();
-      for (int l = (int)in.readLong(); l > 0; l = (int)in.readLong())
-        for (int i = 0; i < l; i++)
+      for (long l = in.skipArray(); l > 0; l = in.skipArray()) {
+        for (long i = 0; i < l; i++) {
           skip(elementType, in);
+        }
+      }
       break;
     case MAP:
       Schema value = schema.getValueType();
-      for (int l = (int)in.readLong(); l > 0; l = (int)in.readLong())
-        for (int i = 0; i < l; i++) {
+      for (long l = in.skipMap(); l > 0; l = in.skipMap()) {
+        for (long i = 0; i < l; i++) {
           skip(STRING_SCHEMA, in);
           skip(value, in);
         }
+      }
       break;
     case UNION:
-      skip(schema.getTypes().get((int)in.readLong()), in);
+      skip(schema.getTypes().get((int)in.readIndex()), in);
       break;
     case FIXED:
-      in.skip(schema.getFixedSize());
+      in.skipFixed(schema.getFixedSize());
       break;
     case STRING:
     case BYTES:
-      long length = in.readLong();
-      while (length > 0)
-        length -= in.skip(length);
+      in.skipBytes();
       break;
     case INT:     in.readInt();           break;
     case LONG:    in.readLong();          break;

Modified: hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumWriter.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumWriter.java?rev=782092&r1=782091&r2=782092&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumWriter.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumWriter.java Fri Jun  5 18:31:45 2009
@@ -21,7 +21,6 @@
 import java.nio.ByteBuffer;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.Set;
 import java.util.Map.Entry;
 
 import org.apache.avro.AvroRuntimeException;
@@ -58,7 +57,7 @@
     case MAP:    writeMap(schema, datum, out);    break;
     case UNION:
       int index = resolveUnion(schema, datum);
-      out.writeLong(index);
+      out.writeIndex(index);
       write(schema.getTypes().get(index), datum, out);
       break;
     case FIXED:   writeFixed(schema, datum, out);   break;
@@ -95,7 +94,7 @@
    * representations.*/
   protected void writeEnum(Schema schema, Object datum, ValueWriter out)
     throws IOException {
-    out.writeInt(schema.getEnumOrdinal((String)datum));
+    out.writeEnum(schema.getEnumOrdinal((String)datum));
   }
   
   /** Called to write a array.  May be overridden for alternate array
@@ -104,12 +103,13 @@
     throws IOException {
     Schema element = schema.getElementType();
     long size = getArraySize(datum);
-    if (size > 0) {
-      out.writeLong(size);
-      for (Iterator<? extends Object> it = getArrayElements(datum); it.hasNext();)
-        write(element, it.next(), out);
+    out.writeArrayStart();
+    out.setItemCount(size);
+    for (Iterator<? extends Object> it = getArrayElements(datum); it.hasNext();) {
+      out.startItem();
+      write(element, it.next(), out);
     }
-    out.writeLong(0);
+    out.writeArrayEnd();
   }
 
   /** Called by the default implementation of {@link #writeArray} to get the
@@ -133,14 +133,14 @@
     throws IOException {
     Schema value = schema.getValueType();
     int size = getMapSize(datum);
-    if (size > 0) {
-      out.writeLong(size);                // write a single block
-      for (Map.Entry<Object,Object> entry : getMapEntries(datum)) {
-        writeString(entry.getKey(), out);
-        write(value, entry.getValue(), out);
-      }
+    out.writeMapStart();
+    out.setItemCount(size);
+    for (Map.Entry<Object,Object> entry : getMapEntries(datum)) {
+      out.startItem();
+      out.writeString((Utf8) entry.getKey());
+      write(value, entry.getValue(), out);
     }
-    out.writeLong(0);
+    out.writeMapEnd();
   }
 
   /** Called by the default implementation of {@link #writeMap} to get the size
@@ -160,13 +160,13 @@
   /** Called to write a string.  May be overridden for alternate string
    * representations.*/
   protected void writeString(Object datum, ValueWriter out) throws IOException {
-    out.writeUtf8((Utf8)datum);
+    out.writeString((Utf8)datum);
   }
 
   /** Called to write a bytes.  May be overridden for alternate bytes
    * representations.*/
   protected void writeBytes(Object datum, ValueWriter out) throws IOException {
-    out.writeBuffer((ByteBuffer)datum);
+    out.writeBytes((ByteBuffer)datum);
   }
 
   private int resolveUnion(Schema union, Object datum) {
@@ -207,7 +207,7 @@
    * representations.*/
   protected void writeFixed(Schema schema, Object datum, ValueWriter out)
     throws IOException {
-    out.write(((GenericFixed)datum).bytes(), 0, schema.getFixedSize());
+    out.writeFixed(((GenericFixed)datum).bytes(), 0, schema.getFixedSize());
   }
 
   /** Called by the default implementation of {@link #instanceOf}.*/

Added: hadoop/avro/trunk/src/java/org/apache/avro/io/BlockingValueWriter.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/io/BlockingValueWriter.java?rev=782092&view=auto
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/io/BlockingValueWriter.java (added)
+++ hadoop/avro/trunk/src/java/org/apache/avro/io/BlockingValueWriter.java Fri Jun  5 18:31:45 2009
@@ -0,0 +1,622 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.io;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.apache.avro.AvroTypeException;
+import org.apache.avro.Schema;
+import org.apache.avro.util.Utf8;
+
+/** A {@link ValueWriter} that writes large arrays and maps as a sequence of
+ * blocks.  So long as individual primitive values fit in memory, arbitrarily
+ * long arrays and maps may be written and subsequently read without exhausting
+ * memory.  Values are buffered until the specified block size would be
+ * exceeded, minimizing block overhead.
+ * @see ValueWriter
+ */
+public class BlockingValueWriter extends ValueWriter {
+
+ /* Implementation note:
+  *
+  * Blocking is complicated because of nesting.  If a large, nested
+  * value overflows your buffer, you've got to do a lot of dancing
+  * around to output the blocks correctly.
+  *
+  * To handle this complexity, this class keeps a stack of blocked
+  * values: each time a new block is started (e.g., by a call to
+  * {@link #writeArrayStart}), an entry is pushed onto this stack.
+  *
+  * In this stack, we keep track of the state of a block.  Blocks can
+  * be in two states.  "Regular" blocks have a non-zero byte count.
+  * "Overflow" blocks help us deal with the case where a block
+  * contains a value that's too big to buffer.  In this case, the
+  * block contains only one item, and we give it an unknown
+  * byte-count.  Because these values (1,unknown) are fixed, we're
+  * able to write the header for these overflow blocks to the
+  * underlying stream without seeing the entire block.  After writing
+  * this header, we've freed our buffer space to be fully devoted to
+  * blocking the large, inner value.
+  */
+
+  private static class BlockedValue {
+    public enum State {
+      /**
+       * The bottom element of our stack represents being _outside_
+       * of a blocked value.
+       */
+      ROOT,
+
+      /**
+       * Represents the "regular" case, i.e., a blocked-value whose
+       * current block is fully contained in the buffer.  In this
+       * case, {@link BlockedValue#start} points to the start of the
+       * blocks _data_ -- but no room has been left for a header!
+       * When this block is terminated, it's data will have to be
+       * moved over a bit to make room for the header. */
+      REGULAR,
+
+      /**
+       * Represents a blocked-value whose current block is in the
+       * overflow state.  In this case, {@link BlockedValue#start} is zero. The
+       * header for such a block has _already been written_ (we've
+       * written out a header indicating that the block has a single
+       * item, and we put a "zero" down for the byte-count to indicate
+       * that we don't know the physical length of the buffer.  Any blocks
+       *  _containing_ this block must be in the {@link #OVERFLOW}
+       *  state. */
+     OVERFLOW
+    };
+
+    /** The type of this blocked value (ARRAY or MAP). */
+    public Schema.Type type;
+
+    /** The state of this BlockedValue */
+    public State state;
+    
+    /** The location in the buffer where this blocked value starts */
+    public int start;
+
+    /**
+     * The index one past the last byte for the previous item. If this
+     * is the first item, this is same as {@link #start}.
+     */
+    public int lastFullItem;
+    
+    /**
+     * Number of items in this blocked value that are stored
+     * in the buffer.
+     */
+    public int items;
+
+    /** Number of items left to write*/
+    public long itemsLeftToWrite;
+
+    /** Create a ROOT instance. */
+    public BlockedValue() {
+      this.type = null;
+      this.state = BlockedValue.State.ROOT;
+      this.start = this.lastFullItem = 0;
+      this.items = 1; // Makes various assertions work out
+    }
+
+    /** Create a REGULAR instance.  (Gets changed to OVERFLOW by
+     * {@link #compact}.) */
+    public BlockedValue(Schema.Type type, int start) {
+      this.type = type;
+      this.state = State.REGULAR;
+      this.start = this.lastFullItem = start;
+      this.items = 0;
+    }
+    
+    /**
+     * Check invariants of <code>this</code> and also the
+     * <code>BlockedValue</code> containing <code>this</code>.
+     */
+    public boolean check(BlockedValue prev, int pos) {
+      assert state != State.ROOT || type == null;
+      assert (state == State.ROOT ||
+              type == Schema.Type.ARRAY || type == Schema.Type.MAP);
+
+      assert 0 <= items;
+      assert 0 != items || start == pos;         // 0==itms ==> start==pos
+      assert 1 < items || start == lastFullItem; // 1<=itms ==> start==lFI
+      assert items <= 1 || start <= lastFullItem; // 1<itms ==> start<=lFI
+      assert lastFullItem <= pos;
+
+      switch (state) {
+      case ROOT:
+          assert start == 0;
+          assert prev == null;
+          break;
+      case REGULAR:
+          assert start >= 0;
+          assert prev.lastFullItem <= start;
+          assert 1 <= prev.items;
+          break;
+      case OVERFLOW:
+          assert start == 0;
+          assert items == 1;
+          assert prev.state == State.ROOT || prev.state == State.OVERFLOW;
+          break;
+      }
+      return false;
+    }
+  }
+
+  /**
+   * The buffer to hold the bytes before being written into the underlying
+   * stream.
+   */
+  private byte[] buf;
+  
+  /**
+   * Index into the location in {@link #buf}, where next byte can be written.
+   */
+  private int pos;
+  
+  /**
+   * The state stack.
+   */
+  private BlockedValue[] blockStack;
+  private int stackTop = -1;
+  private static int STACK_STEP = 10;
+
+  private static final class EncoderBuffer extends ByteArrayOutputStream {
+    public byte[] buffer() {
+      return buf;
+    }
+    
+    public int length() {
+      return count;
+    }
+  }
+  
+  private EncoderBuffer encoderBuffer = new EncoderBuffer();
+
+  private boolean check() {
+    assert out != null;
+    assert buf != null;
+    assert MIN_BUFFER_SIZE <= buf.length;
+    assert 0 <= pos;
+    assert pos <= buf.length : pos + " " + buf.length;
+
+    assert blockStack != null;
+    BlockedValue prev = null;
+    for (int i = 0; i <= stackTop; i++) {
+      BlockedValue v = blockStack[i];
+      v.check(prev, pos);
+      prev = v;
+    }
+    return true;
+  }
+
+  private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
+  private static final int MIN_BUFFER_SIZE = 64;
+
+  public BlockingValueWriter(OutputStream out) {
+    this(out, DEFAULT_BUFFER_SIZE);
+  }
+
+  public BlockingValueWriter(OutputStream out, int bufferSize) {
+    super(out);
+    if (bufferSize < MIN_BUFFER_SIZE) {
+      throw new IllegalArgumentException("Buffer size too smll.");
+    }
+    this.buf = new byte[bufferSize];
+    this.pos = 0;
+    blockStack = new BlockedValue[0];
+    expandStack();
+    BlockedValue bv = blockStack[++stackTop];
+    bv.type = null;
+    bv.state = BlockedValue.State.ROOT;
+    bv.start = bv.lastFullItem = 0;
+    bv.items = 1;
+
+    assert check();
+  }
+
+  private void expandStack() {
+    int oldLength = blockStack.length;
+    blockStack = Arrays.copyOf(blockStack,
+        blockStack.length + STACK_STEP);
+    for (int i = oldLength; i < blockStack.length; i++) {
+      blockStack[i] = new BlockedValue();
+    }
+  }
+
+  /** Redirect output (and reset the parser state if we're checking). */
+  @Override
+  public void init(OutputStream out) throws IOException {
+    super.init(out);
+    this.pos = 0;
+    this.stackTop = 0;
+
+    assert check();
+  }
+
+  @Override
+  public void flush() throws IOException {
+    BlockedValue bv = blockStack[stackTop];
+    if (bv.state == BlockedValue.State.ROOT) {
+      out.write(buf, 0, pos);
+      pos = 0;
+    } else {
+      while (bv.state != BlockedValue.State.OVERFLOW) {
+        compact();
+      }
+    }
+    out.flush();
+
+    assert check();
+  }
+
+  @Override
+  public void writeBoolean(boolean b) throws IOException {
+    if (buf.length < (pos + 1)) ensure(1);
+    buf[pos++] = (byte)(b ? 1 : 0);
+
+    assert check();
+  }
+
+  @Override
+  public void writeInt(int n) throws IOException {
+    if (pos + 5 > buf.length) {
+      ensure(5);
+    }
+    pos = encodeLong(n, buf, pos);
+
+    assert check();
+  }
+
+  @Override
+  public void writeLong(long n) throws IOException {
+    if (pos + 10 > buf.length) {
+      ensure(10);
+    }
+    pos = encodeLong(n, buf, pos);
+
+    assert check();
+  }
+    
+  @Override
+  public void writeFloat(float f) throws IOException {
+    if (pos + 4 > buf.length) {
+      ensure(4);
+    }
+    pos = encodeFloat(f, buf, pos);
+
+    assert check();
+  }
+
+  @Override
+  public void writeDouble(double d) throws IOException {
+    if (pos + 8 > buf.length) {
+      ensure(8);
+    }
+    pos = encodeDouble(d, buf, pos);
+
+    assert check();
+  }
+
+  @Override
+  public void writeString(Utf8 utf8) throws IOException {
+    writeBytes(utf8.getBytes(), 0, utf8.getLength());
+
+    assert check();
+  }
+
+  @Override
+  public void writeBytes(ByteBuffer bytes) throws IOException {
+    writeBytes(bytes.array(), bytes.position(), bytes.remaining());
+
+    assert check();
+  }
+  
+  @Override
+  public void writeFixed(byte[] bytes, int start, int len) throws IOException {
+    doWriteBytes(bytes, start, len);
+
+    assert check();
+  }
+  
+  @Override
+  public void writeBytes(byte[] bytes, int start, int len) throws IOException {
+    if (pos + 5 > buf.length) {
+      ensure(5);
+    }
+    pos = encodeLong(len, buf, pos);
+    doWriteBytes(bytes, start, len);
+
+    assert check();
+  }
+
+  @Override
+  public void writeArrayStart() throws IOException {
+    if (stackTop + 1 == blockStack.length) {
+      expandStack();
+    }
+
+    BlockedValue bv = blockStack[++stackTop];
+    bv.type = Schema.Type.ARRAY;
+    bv.state = BlockedValue.State.REGULAR;
+    bv.start = bv.lastFullItem = pos;
+    bv.items = 0;
+
+    assert check();
+  }
+
+  @Override
+  public void setItemCount(long itemCount) throws IOException {
+    BlockedValue v = blockStack[stackTop];
+    assert v.type == Schema.Type.ARRAY || v.type == Schema.Type.MAP;
+    assert v.itemsLeftToWrite == 0;
+    v.itemsLeftToWrite = itemCount;
+
+    assert check();
+  }
+  
+  @Override
+  public void startItem() throws IOException {
+    if (blockStack[stackTop].state == BlockedValue.State.OVERFLOW) {
+      finishOverflow();
+    }
+    BlockedValue t = blockStack[stackTop];
+    t.items++;
+    t.lastFullItem = pos;
+    t.itemsLeftToWrite--;
+
+    assert check();
+  }
+
+  @Override
+  public void writeArrayEnd() throws IOException {
+    BlockedValue top = blockStack[stackTop];
+    if (top.type != Schema.Type.ARRAY) {
+      throw new AvroTypeException("Called writeArrayEnd outside of an array.");
+    }
+    if (top.itemsLeftToWrite != 0) {
+      throw new AvroTypeException("Failed to write expected number of array elements.");
+    }
+    endBlockedValue();
+
+    assert check();
+  }
+
+  @Override
+  public void writeMapStart() throws IOException {
+    if (stackTop + 1 == blockStack.length) {
+      expandStack();
+    }
+
+    BlockedValue bv = blockStack[++stackTop];
+    bv.type = Schema.Type.MAP;
+    bv.state = BlockedValue.State.REGULAR;
+    bv.start = bv.lastFullItem = pos;
+    bv.items = 0;
+
+    assert check();
+  }
+
+  @Override
+  public void writeMapEnd() throws IOException {
+    BlockedValue top = blockStack[stackTop];
+    if (top.type != Schema.Type.MAP) {
+      throw new AvroTypeException("Called writeMapEnd outside of a map.");
+    }
+    if (top.itemsLeftToWrite != 0) {
+      throw new AvroTypeException("Failed to read write expected number of array elements.");
+    }
+    endBlockedValue();
+    
+    assert check();
+  }
+
+  @Override
+  public void writeIndex(int unionIndex) throws IOException {
+    if (pos + 5 > buf.length) {
+      ensure(5);
+    }
+    pos = encodeLong(unionIndex, buf, pos);
+
+    assert check();
+  }
+
+  private void endBlockedValue() throws IOException {
+    for (; ;) {
+      assert check();
+      BlockedValue t = blockStack[stackTop];
+      assert t.state != BlockedValue.State.ROOT;
+      if (t.state == BlockedValue.State.OVERFLOW) {
+        finishOverflow();
+      }
+      assert t.state == BlockedValue.State.REGULAR;
+      if (0 < t.items) {
+        int byteCount = pos - t.start;
+        if (t.start == 0 &&
+          blockStack[stackTop - 1].state
+            != BlockedValue.State.REGULAR) { // Lucky us -- don't have to move
+          encodeLong(-t.items, out);
+          encodeLong(byteCount, out);
+        } else {
+          encodeLong(-t.items, encoderBuffer);
+          encodeLong(byteCount, encoderBuffer);
+          final int headerSize = encoderBuffer.length();
+          if (buf.length >= pos + headerSize) {
+            pos += headerSize;
+            final int m = t.start;
+            System.arraycopy(buf, m, buf, m + headerSize, byteCount);
+            System.arraycopy(encoderBuffer.buffer(), 0, buf, m, headerSize);
+            encoderBuffer.reset();
+          } else {
+            encoderBuffer.reset();
+            compact();
+            continue;
+          }
+        }
+      }
+      stackTop--;
+      if (buf.length < (pos + 1)) ensure(1);
+      buf[pos++] = 0;   // Sentinel for last block in a blocked value
+      assert check();
+      if (blockStack[stackTop].state == BlockedValue.State.ROOT) {
+        flush();
+      }
+      return;
+    }
+  }
+
+  /**
+   * Called when we've finished writing the last item in an overflow
+   * buffer.  When this is finished, the top of the stack will be
+   * an empty block in the "regular" state.
+   * @throws IOException
+   */
+  private void finishOverflow() throws IOException {
+    BlockedValue s = blockStack[stackTop];
+    if (s.state != BlockedValue.State.OVERFLOW) {
+      throw new IllegalStateException("Not an overflow block");
+    }
+    assert check();
+
+    // Flush any remaining data for this block
+    out.write(buf, 0, pos);
+    pos = 0;
+
+    // Reset top of stack to be in REGULAR mode
+    s.state = BlockedValue.State.REGULAR;
+    s.start = s.lastFullItem = 0;
+    s.items = 0;
+    assert check();
+  }
+
+  private void ensure(int l) throws IOException {
+    if (buf.length < l) {
+      throw new IllegalArgumentException("Too big: " + l);
+    }
+    while (buf.length < (pos + l)) {
+      if (blockStack[stackTop].state == BlockedValue.State.REGULAR) {
+        compact();
+      } else {
+        out.write(buf, 0, pos);
+        pos = 0;
+      }
+    }
+  }
+
+  private void doWriteBytes(byte[] bytes, int start, int len)
+    throws IOException {
+    if (len < buf.length) {
+      ensure(len);
+      System.arraycopy(bytes, start, buf, pos, len);
+      pos += len;
+    } else {
+      ensure(buf.length);
+      assert blockStack[stackTop].state == BlockedValue.State.ROOT ||
+        blockStack[stackTop].state == BlockedValue.State.OVERFLOW;
+      write(bytes, start, len);
+    }
+    assert check();
+  }
+
+  private void write(byte b[], int off, int len) throws IOException {
+    if (blockStack[stackTop].state == BlockedValue.State.ROOT) {
+      out.write(b, off, len);
+    } else {
+      assert check();
+      while (buf.length < (pos + len)) {
+        if (blockStack[stackTop].state == BlockedValue.State.REGULAR) {
+          compact();
+        } else {
+          out.write(buf, 0, pos);
+          pos = 0;
+          if (buf.length <= len) {
+            out.write(b, off, len);
+            len = 0;
+          }
+        }
+      }
+      System.arraycopy(b, off, buf, pos, len);
+      pos += len;
+      assert check();
+    }
+  }
+
+  /** Only call if you're there are REGULAR-state values on the stack. */
+  private void compact() throws IOException {
+    assert check();
+
+    // Find first REGULAR-state value
+    BlockedValue s = null;
+    int i;
+    for (i = 1; i <= stackTop; i++) {
+      s = blockStack[i];
+      if (s.state == BlockedValue.State.REGULAR) break;
+    }
+    assert s != null;
+
+    // We're going to transition "s" into the overflow state.  To do
+    // this, We're going to flush any bytes prior to "s", then write
+    // any full items of "s" into a block, start an overflow
+    // block, write any remaining bytes of "s" up to the start of the
+    // next more deeply-nested blocked-value, and finally move over
+    // any remaining bytes (which will be from more deeply-nested
+    // blocked values).
+
+    // Flush any bytes prios to "s"
+    out.write(buf, 0, s.start);
+
+    // Write any full items of "s"
+    if (1 < s.items) {
+      encodeLong(-(s.items - 1), out);
+      encodeLong(s.lastFullItem - s.start, out);
+      out.write(buf, s.start, s.lastFullItem - s.start);
+      s.start = s.lastFullItem;
+      s.items = 1;
+    }
+
+    // Start an overflow block for s
+    encodeLong(1, out);
+
+    // Write any remaining bytes for "s", up to the next-most
+    // deeply-nested value
+    BlockedValue n = ((i + 1) <= stackTop ?
+        blockStack[i + 1] : null);
+    int end = (n == null ? pos : n.start);
+    out.write(buf, s.lastFullItem, end - s.lastFullItem);
+
+    // Move over any bytes that remain (and adjust indices)
+    System.arraycopy(buf, end, buf, 0, pos - end);
+    for (int j = i + 1; j <= stackTop; j++) {
+        n = blockStack[j];
+        n.start -= end;
+        n.lastFullItem -= end;
+    }
+    pos -= end;
+
+    assert s.items == 1;
+    s.start = s.lastFullItem = 0;
+    s.state = BlockedValue.State.OVERFLOW;
+
+    assert check();
+  }
+}

Modified: hadoop/avro/trunk/src/java/org/apache/avro/io/ValueReader.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/io/ValueReader.java?rev=782092&r1=782091&r2=782092&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/io/ValueReader.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/io/ValueReader.java Fri Jun  5 18:31:45 2009
@@ -18,117 +18,454 @@
 package org.apache.avro.io;
 
 import java.io.EOFException;
-import java.io.FilterInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
 
+import org.apache.avro.AvroTypeException;
+import org.apache.avro.ipc.ByteBufferInputStream;
 import org.apache.avro.util.Utf8;
 
-/** Read leaf values.
- * <p>Has no state except that of the OutputStream it wraps.
- * <p>Used by {@link DatumReader} implementations to read datum leaf values.
- * @see ValueWriter
+/**
+ * Low-level support for de-serializing Avro values.
+ *
+ *  This class has two types of methods.  One type of methods support
+ *  the reading of leaf values (for example, {@link #readLong} and
+ *  {@link #readString}).
+ *
+ *  The other type of methods support the writing of maps and arrays.
+ *  These methods are {@link #readArrayStart}, {@link #arrayNext},
+ *  and similar methods for maps).  See {@link #readArrayStart} for
+ *  details on these methods.)
+ *
+ *  @see ValueWriter
  */
-public class ValueReader extends FilterInputStream {
+
+public class ValueReader {
+  private InputStream in;
+  
+  private class ByteReader {
+    public ByteBuffer read(ByteBuffer old, int length) throws IOException {
+      ByteBuffer result;
+      if (old != null && length <= old.capacity()) {
+        result = old;
+        result.clear();
+      } else {
+        result = ByteBuffer.allocate(length);
+      }
+      doReadBytes(result.array(), result.position(), length);
+      result.limit(length);
+      return result;
+    }
+  }
+  
+  private class ReuseByteReader extends ByteReader {
+    private final ByteBufferInputStream bbi;
+    
+    public ReuseByteReader(ByteBufferInputStream bbi) {
+      this.bbi = bbi;
+    }
+    
+    @Override
+    public ByteBuffer read(ByteBuffer old, int length) throws IOException {
+      if (old != null) {
+        return super.read(old, length);
+      } else {
+        return bbi.readBuffer(length);
+      }
+    }
+    
+  }
+  
+  private final ByteReader byteReader;
+
   public ValueReader(InputStream in) {
-    super(in);
+    this.in = in;
+    byteReader = (in instanceof ByteBufferInputStream) ?
+        new ReuseByteReader((ByteBufferInputStream) in) : new ByteReader();
   }
   
-  /** Same contract as {@link InputStream#read()}, except that EOFException is
-   * throw when EOF reached rather than returning -1.
-   * @throws EOFException if at EOF. */
-  public int read() throws IOException {
-      int value = in.read();
-      if (value < 0) throw new EOFException();
-      return value;
-  }
-
-  /** Read a string written by {@link ValueWriter#writeUtf8(Utf8)}.
-   * @throws EOFException if EOF is reached before reading all the bytes. */
-  public Utf8 readUtf8(Object old) throws IOException {
-    Utf8 utf8 = old instanceof Utf8 ? (Utf8)old : new Utf8();
-    utf8.setLength((int)readLong());
-    readBytes(utf8.getBytes(), 0, utf8.getLength());
-    return utf8;
-  }
-  /** Read buffer written by {@link ValueWriter#writeBuffer(ByteBuffer)}.
-   * @throws EOFException if EOF is reached before reading all the bytes. */
-  public ByteBuffer readBuffer(Object old) throws IOException {
-    int length = (int)readLong();
-    ByteBuffer bytes;
-    if ((old instanceof ByteBuffer) && ((ByteBuffer)old).capacity() >= length) {
-      bytes = (ByteBuffer)old;
-      bytes.clear();
-    } else {
-      bytes = ByteBuffer.allocate(length);
-    }
-    readBytes(bytes.array(), 0, length);
-    bytes.limit(length);
-    return bytes;
+  /** Start reading against a different input stream.  Stateful
+    * subclasses will reset their states to their initial state. */
+  public void init(InputStream in) {
+    this.in = in;
+  }
+
+  /**
+   * "Reads" a null value.  (Doesn't actually read anything, but
+   * advances the state of the parser if the implementation is
+   * stateful.)
+   *  @throws AvroTypeException If this is a stateful reader and
+   *          null is not the type of the next value to be read
+   */
+  public void readNull() throws IOException { }
+
+  /**
+   * Reads a boolean value written by {@link ValueWriter#writeBoolean}.
+   * @throws AvroTypeException If this is a stateful reader and
+   * boolean is not the type of the next value to be read
+   */
+
+  public boolean readBoolean() throws IOException {
+    int n = in.read();
+    if (n < 0) {
+      throw new EOFException();
+    }
+    return n == 1;
   }
 
-  /** Read an int written by {@link ValueWriter#writeInt(int)}.
-   * @throws EOFException if EOF is reached before reading all the bytes.*/
+  /**
+   * Reads an integer written by {@link ValueWriter#writeInt}.
+   * @throws AvroTypeException If encoded value is larger than
+   *          32-bits
+   * @throws AvroTypeException If this is a stateful reader and
+   *          int is not the type of the next value to be read
+   */
   public int readInt() throws IOException {
-    return (int)readLong();
+    long result = readLong();
+    if (result < Integer.MIN_VALUE || Integer.MAX_VALUE < result) {
+      throw new AvroTypeException("Integer overflow.");
+    }
+    return (int)result;
   }
 
-  /** Read a long written by {@link ValueWriter#writeLong(long)}.
-   * @throws EOFException if EOF is reached before reading all the bytes. */
+  /**
+   * Reads a long written by {@link ValueWriter#writeLong}.
+   * @throws AvroTypeException If this is a stateful reader and
+   *          long is not the type of the next value to be read
+   */
   public long readLong() throws IOException {
-    long b = read();
-    long n = b & 0x7F;
-    for (int shift = 7; (b & 0x80) != 0; shift += 7) {
-      b = read();
-      n |= (b & 0x7F) << shift;
+    long n = 0;
+    for (int shift = 0; ; shift += 7) {
+      long b = in.read();
+      if (b >= 0) {
+         n |= (b & 0x7F) << shift;
+         if ((b & 0x80) == 0) {
+           break;
+         }
+      } else {
+        throw new EOFException();
+      }
     }
-    return (n >>> 1) ^ -(n & 1);                  // back to two's-complement
+    return (n >>> 1) ^ -(n & 1); // back to two's-complement
   }
 
-  /** Read a float written by {@link ValueWriter#writeFloat(float)}.
-   * @throws EOFException if EOF is reached before reading all the bytes. */
+  /**
+   * Reads a float written by {@link ValueWriter#writeFloat}.
+   * @throws AvroTypeException If this is a stateful reader and
+   * is not the type of the next value to be read
+   */
   public float readFloat() throws IOException {
-      return Float.intBitsToFloat(((read() & 0xff)      ) |
-                                  ((read() & 0xff) <<  8) |
-                                  ((read() & 0xff) << 16) |
-                                  ((read() & 0xff) << 24));
+    int n = 0;
+    for (int i = 0, shift = 0; i < 4; i++, shift += 8) {
+      int k = in.read();
+      if (k >= 0) {
+        n |= (k & 0xff) << shift;
+      } else {
+        throw new EOFException();
+      }
+    }
+    return Float.intBitsToFloat(n);
   }
 
-  /** Read a double written by {@link ValueWriter#writeDouble(double)}.
-   * @throws EOFException if EOF is reached before reading all the bytes. */
+  /**
+   * Reads a double written by {@link ValueWriter#writeDouble}.
+   * @throws AvroTypeException If this is a stateful reader and
+   *           is not the type of the next value to be read
+   */
   public double readDouble() throws IOException {
-      return Double.longBitsToDouble(((read() & 0xffL)      ) |
-                                     ((read() & 0xffL) <<  8) |
-                                     ((read() & 0xffL) << 16) |
-                                     ((read() & 0xffL) << 24) |
-                                     ((read() & 0xffL) << 32) |
-                                     ((read() & 0xffL) << 40) |
-                                     ((read() & 0xffL) << 48) |
-                                     ((read() & 0xffL) << 56));
+    long n = 0;
+    for (int i = 0, shift = 0; i < 8; i++, shift += 8) {
+      long k = in.read();
+      if (k >= 0) {
+        n |= (k & 0xff) << shift;
+      } else {
+        throw new EOFException();
+      }
+    }
+    return Double.longBitsToDouble(n);
   }
-
-  /** Read a boolean written by {@link ValueWriter#writeBoolean(boolean)}.
-   * @throws EOFException if EOF is reached before reading all the bytes. */
-  public boolean readBoolean() throws IOException {
-    return read() == 1;
+    
+  /**
+   * Reads a char-string written by {@link ValueWriter#writeString}.
+   * @throws AvroTypeException If this is a stateful reader and
+   * char-string is not the type of the next value to be read
+   */
+  public Utf8 readString(Utf8 old) throws IOException {
+    int length = readInt();
+    Utf8 result = (old != null ? old : new Utf8());
+    result.setLength(length);
+    doReadBytes(result.getBytes(), 0, length);
+    return result;
+  }
+    
+  /**
+   * Discards a char-string written by {@link ValueWriter#writeString}.
+   *  @throws AvroTypeException If this is a stateful reader and
+   *          char-string is not the type of the next value to be read
+   */
+  public void skipString() throws IOException {
+    doSkipBytes(readInt());
+  }
+
+  /**
+   * Reads a byte-string written by {@link ValueWriter#writeBytes}.
+   * if <tt>old</tt> is not null and has sufficient capacity to take in
+   * the bytes being read, the bytes are returned in <tt>old</tt>.
+   * @throws AvroTypeException If this is a stateful reader and
+   *          byte-string is not the type of the next value to be read
+   */
+  public ByteBuffer readBytes(ByteBuffer old) throws IOException {
+    int length = readInt();
+    return byteReader.read(old, length);
+  }
+
+  /**
+   * Discards a byte-string written by {@link ValueWriter#writeBytes}.
+   *  @throws AvroTypeException If this is a stateful reader and
+   *          byte-string is not the type of the next value to be read
+   */
+  public void skipBytes() throws IOException {
+    doSkipBytes(readInt());
+  }
+  
+  /**
+   * Reads fixed sized binary object.
+   * @param bytes The buffer to store the contents being read.
+   * @param start The position where the data needs to be written.
+   * @param length  The size of the binary object.
+   * @throws AvroTypeException If this is a stateful reader and
+   *          fixed sized binary object is not the type of the next
+   *          value to be read or the length is incorrect.
+   * @throws IOException
+   */
+  public void readFixed(byte[] bytes, int start, int length)
+    throws IOException {
+    doReadBytes(bytes, start, length);
   }
 
-  /** Read bytes into an array.
-   * @throws EOFException if EOF is reached before reading all the bytes. */
-  public void readBytes(byte[] buffer) throws IOException {
-    readBytes(buffer, 0, buffer.length);
+  /**
+   * A shorthand for <tt>readFixed(bytes, 0, bytes.length)</tt>.
+   * @throws AvroTypeException If this is a stateful reader and
+   *          fixed sized binary object is not the type of the next
+   *          value to be read or the length is incorrect.
+   * @throws IOException
+   */
+  public void readFixed(byte[] bytes) throws IOException {
+    readFixed(bytes, 0, bytes.length);
+  }
+  
+  /**
+   * Discards fixed sized binary object.
+   * @param length  The size of the binary object to be skipped.
+   * @throws AvroTypeException If this is a stateful reader and
+   *          fixed sized binary object is not the type of the next
+   *          value to be read or the length is incorrect.
+   * @throws IOException
+   */
+  public void skipFixed(int length) throws IOException {
+    doSkipBytes(length);
+  }
+
+  /**
+   * Reads an enumeration.
+   * @return The enumeration's value.
+   * @throws AvroTypeException If this is a stateful reader and
+   *          enumeration is not the type of the next value to be read.
+   * @throws IOException
+   */
+  public int readEnum() throws IOException {
+    return readInt();
+  }
+  
+  private void doSkipBytes(long length) throws IOException, EOFException {
+    while (length > 0) {
+      long n = in.skip(length);
+      if (n <= 0) {
+        throw new EOFException();
+      }
+      length -= n;
+    }
   }
-  /** Read bytes into an array.
-   * @throws EOFException if EOF is reached before reading all the bytes. */
-  public void readBytes(byte[] buffer, int offset, int length)
+  
+  /**
+   * Reads <tt>length</tt> bytes into <tt>bytes</tt> starting at
+   * <tt>start</tt>. 
+   * @throws EOFException  If there are not enough number of bytes in
+   * the stream.
+   * @throws IOException
+   */
+  private void doReadBytes(byte[] bytes, int start, int length)
     throws IOException {
-    int total = 0;
-    while (total < length) {
-      int n = read(buffer, offset+total, length-total);
+    while (length > 0) {
+      int n = in.read(bytes, start, length);
       if (n < 0) throw new EOFException();
-      total += n;
+      start += n;
+      length -= n;
+    }
+  }
+
+/**
+   * Returns the number of items to follow in the current array or map.
+   * Returns 0 if there are no more items in the current array and the array/map
+   * has ended.
+   * @return
+   * @throws IOException
+   */
+  private long doReadItemCount() throws IOException {
+    long result = readLong();
+    if (result < 0) {
+      readLong(); // Consume byte-count if present
+      result = -result;
     }
+    return result;
   }
 
+  /**
+   * Reads the count of items in the current array or map and skip those
+   * items, if possible. If it could skip the items, keep repeating until
+   * there are no more items left in the array or map. If items cannot be
+   * skipped (because byte count to skip is not found in the stream)
+   * return the count of the items found. The client needs to skip the
+   * items individually.
+   * @return  Zero if there are no more items to skip and end of array/map
+   * is reached. Positive number if some items are found that cannot be
+   * skipped and the client needs to skip them individually.
+   * @throws IOException
+   */
+  private long doSkipItems() throws IOException {
+    long result = readInt();
+    while (result < 0) {
+      long bytecount = readLong();
+      doSkipBytes(bytecount);
+      result = readInt();
+    }
+    return result;
+  }
+
+  /**
+   * Reads and returns the size of the first block of an array.  If
+   * this method returns non-zero, then the caller should read the
+   * indicated number of items, and then call {@link
+   * #arrayNext} to find out the number of items in the next
+   * block.  The typical pattern for consuming an array looks like:
+   * <pre>
+   *   for(long i = in.readArrayStart(); i != 0; i = in.arrayNext()) {
+   *     for (long j = 0; j < i; j++) {
+   *       read next element of the array;
+   *     }
+   *   }
+   * </pre>
+   *  @throws AvroTypeException If this is a stateful reader and
+   *          array is not the type of the next value to be read */
+  public long readArrayStart() throws IOException {
+    return doReadItemCount();
+  }
+
+  /**
+   * Processes the next block of an array andreturns the number of items in
+   * the block and let's the caller
+   * read those items.
+   * @throws AvroTypeException When called outside of an
+   *         array context
+   */
+  public long arrayNext() throws IOException {
+    return doReadItemCount();
+  }
+
+  /**
+   * Used for quickly skipping through an array.  Note you can
+   * either skip the entire array, or read the entire array (with
+   * {@link #readArrayStart}), but you can't mix the two on the
+   * same array.
+   *
+   * This method will skip through as many items as it can, all of
+   * them if possible.  It will return zero if there are no more
+   * items to skip through, or an item count if it needs the client's
+   * help in skipping.  The typical usage pattern is:
+   * <pre>
+   *   for(long i = in.skipArray(); i != 0; i = i.skipArray()) {
+   *     for (long j = 0; j < i; j++) {
+   *       read and discard the next element of the array;
+   *     }
+   *   }
+   * </pre>
+   * Note that this method can automatically skip through items if a
+   * byte-count is found in the underlying data, or if a schema has
+   * been provided to the implementation, but
+   * otherwise the client will have to skip through items itself.
+   *
+   *  @throws AvroTypeException If this is a stateful reader and
+   *          array is not the type of the next value to be read
+   */
+  public long skipArray() throws IOException {
+    return doSkipItems();
+  }
+
+  /**
+   * Reads and returns the size of the next block of map-entries.
+   * Similar to {@link #readArrayStart}.
+   *
+   *  As an example, let's say you want to read a map of records,
+   *  the record consisting of an Long field and a Boolean field.
+   *  Your code would look something like this:
+   * <pre>
+   *   Map<String,Record> m = new HashMap<String,Record>();
+   *   Record reuse = new Record();
+   *   for(long i = in.readMapStart(); i != 0; i = in.readMapNext()) {
+   *     for (long j = 0; j < i; j++) {
+   *       String key = in.readString();
+   *       reuse.intField = in.readInt();
+   *       reuse.boolField = in.readBoolean();
+   *       m.put(key, reuse);
+   *     }
+   *   }
+   * </pre>
+   * @throws AvroTypeException If this is a stateful reader and
+   *         map is not the type of the next value to be read
+   */
+  public long readMapStart() throws IOException {
+    return doReadItemCount();
+  }
+
+  /**
+   * Processes the next block of map entries and returns the count of them.
+   * Similar to {@link #arrayNext}.  See {@link #readMapStart} for details.
+   * @throws AvroTypeException When called outside of a
+   *         map context
+   */
+  public long mapNext() throws IOException {
+    return doReadItemCount();
+  }
+
+  /**
+   * Support for quickly skipping through a map similar to {@link #skipArray}.
+   *
+   * As an example, let's say you want to skip a map of records,
+   * the record consisting of an Long field and a Boolean field.
+   * Your code would look something like this:
+   * <pre>
+   *   for(long i = in.skipMap(); i != 0; i = in.skipMap()) {
+   *     for (long j = 0; j < i; j++) {
+   *       in.skipString();  // Discard key
+   *       in.readInt(); // Discard int-field of value
+   *       in.readBoolean(); // Discard boolean-field of value
+   *     }
+   *   }
+   * </pre>
+   *  @throws AvroTypeException If this is a stateful reader and
+   *          array is not the type of the next value to be read */
+
+  public long skipMap() throws IOException {
+    return doSkipItems();
+  }
+
+  /**
+   * Reads the tag of a union written by {@link ValueWriter#writeIndex}.
+   * @throws AvroTypeException If this is a stateful reader and
+   *         union is not the type of the next value to be read
+   */
+  public int readIndex() throws IOException {
+    return readInt();
+  }
 }

Modified: hadoop/avro/trunk/src/java/org/apache/avro/io/ValueWriter.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/io/ValueWriter.java?rev=782092&r1=782091&r2=782092&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/io/ValueWriter.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/io/ValueWriter.java Fri Jun  5 18:31:45 2009
@@ -17,75 +17,404 @@
  */
 package org.apache.avro.io;
 
-import java.io.*;
+import java.io.IOException;
+import java.io.OutputStream;
 import java.nio.ByteBuffer;
+
+import org.apache.avro.AvroTypeException;
+import org.apache.avro.ipc.ByteBufferOutputStream;
 import org.apache.avro.util.Utf8;
 
-/** Write leaf values.
- * <p>Has no state except that of the OutputStream it wraps.
- * <p>Used by {@link DatumWriter} implementations to write datum leaf values.
- * @see ValueReader
+/**
+ * Low-level support for serializing Avro values.
+ *
+ * This class has two types of methods.  One type of methods support
+ * the writing of leaf values (for example, {@link #writeLong} and
+ * {@link #writeString}).  These methods have analogs in {@link
+ * ValueReader}.
+ *
+ * The other type of methods support the writing of maps and arrays.
+ * These methods are {@link #writeArrayStart}, {@link
+ * #startItem}, and {@link #writeArrayEnd} (and similar methods for
+ * maps).  Some implementations of {@link ValueWriter} handle the
+ * buffering required to break large maps and arrays into blocks,
+ * which is necessary for applications that want to do streaming.
+ * (See {@link #writeArrayStart} for details on these methods.)
+ *
+ *  @see ValueReader
  */
-public class ValueWriter extends FilterOutputStream {
+public class ValueWriter {
+  protected OutputStream out;
+  
+  private interface ByteWriter {
+    public void write(ByteBuffer bytes) throws IOException;
+  }
+  
+  private static final class SimpleByteWriter implements ByteWriter {
+    private final OutputStream out;
+
+    public SimpleByteWriter(OutputStream out) {
+      this.out = out;
+    }
+
+    @Override
+    public void write(ByteBuffer bytes) throws IOException {
+      encodeLong(bytes.remaining(), out);
+      out.write(bytes.array(), bytes.position(), bytes.remaining());
+    }
+  }
+
+  private static final class ReuseByteWriter implements ByteWriter {
+    private final ByteBufferOutputStream bbout;
+    public ReuseByteWriter(ByteBufferOutputStream bbout) {
+      this.bbout = bbout;
+    }
+
+    @Override
+    public void write(ByteBuffer bytes) throws IOException {
+      encodeLong(bytes.remaining(), bbout);
+      bbout.writeBuffer(bytes);
+    }
+  }
+  
+  private final ByteWriter byteWriter;
+
+  /** Create a writer that sends its output to the underlying stream
+   *  <code>out</code>. */
   public ValueWriter(OutputStream out) {
-    super(out);
+    this.out = out;
+    this.byteWriter = (out instanceof ByteBufferOutputStream) ?
+        new ReuseByteWriter((ByteBufferOutputStream) out) :
+          new SimpleByteWriter(out);
   }
-  /** Write a string as {@link #writeLong(long)}-prefixed UTF-8. */
-  public void writeUtf8(Utf8 utf8) throws IOException {
-    writeLong(utf8.getLength());
-    out.write(utf8.getBytes(), 0, utf8.getLength());
+
+  /** Redirect output (and reset the parser state if we're checking). */
+  public void init(OutputStream out) throws IOException {
+    flush();
+    this.out = out;
+  }
+
+  /**
+   * Writes any buffered output to the underlying stream.
+   */
+  public void flush() throws IOException {
+    out.flush();
   }
-  /** Write a buffer of bytes. */
-  public void writeBuffer(ByteBuffer bytes) throws IOException {
-    writeLong(bytes.remaining());
-    out.write(bytes.array(), bytes.position(), bytes.remaining());
-  }
-  /** Write an int using 1-5 bytes.  The sign is moved to the low-order bit,
-   * and then the value is written so that the high-order bit of each byte
-   * indicates whether more bytes remain. */
-  public void writeInt(int n) throws IOException { writeLong(n); }
-
-  /** Write a long using 1-10 bytes.  The sign is moved to the low-order bit,
-   * and then the value is written so that the high-order bit of each byte
-   * indicates whether more bytes remain. */
+
+  /**
+   * "Writes" a null value.  (Doesn't actually write anything, but
+   * advances the state of the parser if this class is stateful.)
+   * @throws AvroTypeException If this is a stateful writer and a
+   *         null is not expected
+   */
+  public void writeNull() throws IOException { }
+  
+  /**
+   * Write a boolean value.
+   * @throws AvroTypeException If this is a stateful writer and a
+   * boolean is not expected
+   */
+  public void writeBoolean(boolean b) throws IOException {
+    out.write(b ? 1 : 0);
+  }
+
+  /**
+   * Writes a 32-bit integer.
+   * @throws AvroTypeException If this is a stateful writer and an
+   * integer is not expected
+   */
+  public void writeInt(int n) throws IOException {
+    encodeLong(n, out);
+  }
+
+  /**
+   * Write a 64-bit integer.
+   * @throws AvroTypeException If this is a stateful writer and a
+   * long is not expected
+   */
   public void writeLong(long n) throws IOException {
-    n = (n << 1) ^ (n >> 63);                     // move sign to low-order bit
+    encodeLong(n, out);
+  }
+  
+  /** Write a float.
+   * @throws IOException 
+   * @throws AvroTypeException If this is a stateful writer and a
+   * float is not expected
+   */
+  public void writeFloat(float f) throws IOException {
+    encodeFloat(f, out);
+  }
+
+  /**
+   * Write a double.
+   * @throws AvroTypeException If this is a stateful writer and a
+   * double is not expected
+   */
+  public void writeDouble(double d) throws IOException {
+    encodeDouble(d, out);
+  }
+
+  /**
+   * Write a Unicode character string.
+   * @throws AvroTypeException If this is a stateful writer and a
+   * char-string is not expected
+   */
+  public void writeString(Utf8 utf8) throws IOException {
+    encodeLong(utf8.getLength(), out);
+    out.write(utf8.getBytes(), 0, utf8.getLength());
+  }
+
+  /**
+   * Write a Unicode character string.
+   * @throws AvroTypeException If this is a stateful writer and a
+   * char-string is not expected
+   */
+  public void writeString(String str) throws IOException {
+    writeString(new Utf8(str));
+  }
+
+  /**
+   * Write a byte string.
+   * @throws AvroTypeException If this is a stateful writer and a
+   *         byte-string is not expected
+   */
+  public void writeBytes(ByteBuffer bytes) throws IOException {
+    byteWriter.write(bytes);
+  }
+  
+  /**
+   * Write a byte string.
+   * @throws AvroTypeException If this is a stateful writer and a
+   * byte-string is not expected
+   */
+  public void writeBytes(byte[] bytes, int start, int len) throws IOException {
+    encodeLong(len, out);
+    out.write(bytes, start, len);
+  }
+  
+  /**
+   * Writes a byte string.
+   * Equivalent to <tt>writeBytes(bytes, 0, bytes.length)</tt>
+   * @throws IOException 
+   * @throws AvroTypeException If this is a stateful writer and a
+   * byte-string is not expected
+   */
+  public void writeBytes(byte[] bytes) throws IOException {
+    writeBytes(bytes, 0, bytes.length);
+  }
+
+  /**
+   * Writes a fixed size binary object.
+   * @param bytes The contents to write
+   * @param start The position within <tt>bytes</tt> where the contents
+   * start.
+   * @param len The number of bytes to write.
+   * @throws AvroTypeException If this is a stateful writer and a
+   * byte-string is not expected
+   * @throws IOException
+   */
+  public void writeFixed(byte[] bytes, int start, int len) throws IOException {
+	  out.write(bytes, start, len);
+  }
+
+  /**
+   * A shorthand for <tt>writeFixed(bytes, 0, bytes.length)</tt>
+   * @param bytes
+   */
+  public void writeFixed(byte[] bytes) throws IOException {
+    writeFixed(bytes, 0, bytes.length);
+  }
+  
+  /**
+   * Writes an enumeration.
+   * @param e
+   * @throws AvroTypeException If this is a stateful writer and an enumeration
+   * is not expected or the <tt>e</tt> is out of range.
+   * @throws IOException
+   */
+  public void writeEnum(int e) throws IOException {
+    encodeLong(e, out);
+  }
+
+  /** Call this method to start writing an array.
+   *
+   *  When starting to serialize an array, call {@link
+   *  #writeArrayStart}. Then, before writing any data for any item
+   *  call {@link #setItemCount} followed by a sequence of
+   *  {@link #startItem()} and the item itself. The number of
+   *  {@link #startItem()} should match the number specified in
+   *  {@link #setItemCount}.
+   *  When actually writing the data of the item, you can call any {@link
+   *  ValueWriter} method (e.g., {@link #writeLong}).  When all items
+   *  of the array have been written, call {@link #writeArrayEnd}.
+   *
+   *  As an example, let's say you want to write an array of records,
+   *  the record consisting of an Long field and a Boolean field.
+   *  Your code would look something like this:
+   *  <pre>
+   *  out.writeArrayStart();
+   *  out.setItemCount(list.size());
+   *  for (Record r : list) {
+   *    out.startItem();
+   *    out.writeLong(r.longField);
+   *    out.writeBoolean(r.boolField);
+   *  }
+   *  out.writeArrayEnd();
+   *  </pre>
+   *  @throws AvroTypeException If this is a stateful writer and an
+   *          array is not expected
+   */
+  public void writeArrayStart() throws IOException {
+  }
+
+  /**
+   * Call this method before writing a batch of items in an array or a map.
+   * Then for each item, call {@link #startItem()} followed by any of the
+   * other write methods of {@link ValueWriter}. The number of calls
+   * to {@link #startItem()} must be equal to the count specified
+   * in {@link #setItemCount}. Once a batch is completed you
+   * can start another batch with {@link #setItemCount}.
+   * 
+   * @param itemCount The number of {@link #startItem()} calls to follow.
+   * @throws IOException
+   */
+  public void setItemCount(long itemCount) throws IOException {
+    if (itemCount > 0) {
+      writeLong(itemCount);
+    }
+  }
+  
+  /**
+   * Start a new item of an array or map.
+   * See {@link #writeArrayStart} for usage information.
+   * @throws AvroTypeException If called outside of an array or map context
+   */
+  public void startItem() throws IOException {
+  }
+
+  /**
+   * Call this method to finish writing an array.
+   * See {@link #writeArrayStart} for usage information.
+   *
+   * @throws AvroTypeException If items written does not match count
+   *          provided to {@link #writeArrayStart}
+   * @throws AvroTypeException If not currently inside an array
+   */
+  public void writeArrayEnd() throws IOException {
+    encodeLong(0, out);
+  }
+
+  /**
+   * Call this to start a new map.  See
+   * {@link #writeArrayStart} for details on usage.
+   *
+   * As an example of usage, let's say you want to write a map of
+   * records, the record consisting of an Long field and a Boolean
+   * field.  Your code would look something like this:
+   * <pre>
+   * out.writeMapStart();
+   * out.setItemCount(list.size());
+   * for (Map.Entry<String,Record> entry : map.entrySet()) {
+   *   out.startItem();
+   *   out.writeString(entry.getKey());
+   *   out.writeLong(entry.getValue().longField);
+   *   out.writeBoolean(entry.getValue().boolField);
+   * }
+   * out.writeMapEnd();
+   * </pre>
+   * @throws AvroTypeException If this is a stateful writer and a
+   * map is not expected
+   */
+  public void writeMapStart() throws IOException {
+  }
+
+  /**
+   * Call this method to terminate the inner-most, currently-opened
+   * map.  See {@link #writeArrayStart} for more details.
+   *
+   * @throws AvroTypeException If items written does not match count
+   *          provided to {@link #writeMapStart}
+   * @throws AvroTypeException If not currently inside a map
+   */
+  public void writeMapEnd() throws IOException {
+    encodeLong(0, out);
+  }
+
+  /** Call this method to write the tag of a union.
+   *
+   * As an example of usage, let's say you want to write a union,
+   * whose second branch is a record consisting of an Long field and
+   * a Boolean field.  Your code would look something like this:
+   * <pre>
+   * out.writeIndex(1);
+   * out.writeLong(record.longField);
+   * out.writeBoolean(record.boolField);
+   * </pre>
+   * @throws AvroTypeException If this is a stateful writer and a
+   * map is not expected
+   */
+  public void writeIndex(int unionIndex) throws IOException {
+    encodeLong(unionIndex, out);
+  }
+  
+  protected static void encodeLong(long n, OutputStream o) throws IOException {
+    n = (n << 1) ^ (n >> 63); // move sign to low-order bit
+    while ((n & ~0x7F) != 0) {
+      o.write((byte)((n & 0x7f) | 0x80));
+      n >>>= 7;
+    }
+    o.write((byte)n);
+  }
+
+  protected static int encodeLong(long n, byte[] b, int pos) {
+    n = (n << 1) ^ (n >> 63); // move sign to low-order bit
     while ((n & ~0x7F) != 0) {
-      out.write((byte)((n & 0x7f) | 0x80));
+      b[pos++] = (byte)((n & 0x7f) | 0x80);
       n >>>= 7;
     }
-    out.write((byte)n);
+    b[pos++] = (byte) n;
+    return pos;
   }
 
-  /** Writes a float as eight bytes. */
-  public void writeFloat(float n) throws IOException {
-    int bits = Float.floatToRawIntBits(n);
-    out.write((int)(bits      ) & 0xFF);
-    out.write((int)(bits >>  8) & 0xFF);
-    out.write((int)(bits >> 16) & 0xFF);
-    out.write((int)(bits >> 24) & 0xFF);
-  }
-
-  /** Writes a double as eight bytes. */
-  public void writeDouble(double n) throws IOException {
-    long bits = Double.doubleToRawLongBits(n);
-    out.write((int)(bits      ) & 0xFF);
-    out.write((int)(bits >>  8) & 0xFF);
-    out.write((int)(bits >> 16) & 0xFF);
-    out.write((int)(bits >> 24) & 0xFF);
-    out.write((int)(bits >> 32) & 0xFF);
-    out.write((int)(bits >> 40) & 0xFF);
-    out.write((int)(bits >> 48) & 0xFF);
-    out.write((int)(bits >> 56) & 0xFF);
+  protected static void encodeFloat(float f, OutputStream o) throws IOException {
+    long bits = Float.floatToRawIntBits(f);
+    o.write((int)(bits      ) & 0xFF);
+    o.write((int)(bits >>  8) & 0xFF);
+    o.write((int)(bits >> 16) & 0xFF);
+    o.write((int)(bits >> 24) & 0xFF);
   }
- 
- /** Writes a boolean as a single byte. */
-  public void writeBoolean(boolean b) throws IOException {
-    out.write(b ? 1 : 0);
+
+  protected static int encodeFloat(float f, byte[] b, int pos) {
+    long bits = Float.floatToRawIntBits(f);
+    b[pos++] = (byte)((bits      ) & 0xFF);
+    b[pos++] = (byte)((bits >>  8) & 0xFF);
+    b[pos++] = (byte)((bits >> 16) & 0xFF);
+    b[pos++] = (byte)((bits >> 24) & 0xFF);
+    return pos;
   }
 
-  public void write(byte b[], int off, int len) throws IOException {
-    out.write(b, off, len);
+  protected static void encodeDouble(double d, OutputStream o) throws IOException {
+    long bits = Double.doubleToRawLongBits(d);
+    o.write((int)(bits      ) & 0xFF);
+    o.write((int)(bits >>  8) & 0xFF);
+    o.write((int)(bits >> 16) & 0xFF);
+    o.write((int)(bits >> 24) & 0xFF);
+    o.write((int)(bits >> 32) & 0xFF);
+    o.write((int)(bits >> 40) & 0xFF);
+    o.write((int)(bits >> 48) & 0xFF);
+    o.write((int)(bits >> 56) & 0xFF);
   }
 
+  protected static int encodeDouble(double d, byte[] b, int pos) {
+    long bits = Double.doubleToRawLongBits(d);
+    b[pos++] = (byte)((bits      ) & 0xFF);
+    b[pos++] = (byte)((bits >>  8) & 0xFF);
+    b[pos++] = (byte)((bits >> 16) & 0xFF);
+    b[pos++] = (byte)((bits >> 24) & 0xFF);
+    b[pos++] = (byte)((bits >> 32) & 0xFF);
+    b[pos++] = (byte)((bits >> 40) & 0xFF);
+    b[pos++] = (byte)((bits >> 48) & 0xFF);
+    b[pos++] = (byte)((bits >> 56) & 0xFF);
+    return pos;
+  }
 }

Modified: hadoop/avro/trunk/src/java/org/apache/avro/ipc/ByteBufferInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/ByteBufferInputStream.java?rev=782092&r1=782091&r2=782092&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/ByteBufferInputStream.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/ByteBufferInputStream.java Fri Jun  5 18:31:45 2009
@@ -25,7 +25,7 @@
 import java.util.List;
 
 /** Utility to present {@link ByteBuffer} data as an {@link InputStream}.*/
-class ByteBufferInputStream extends InputStream {
+public class ByteBufferInputStream extends InputStream {
   private List<ByteBuffer> buffers;
   private int current;
 

Modified: hadoop/avro/trunk/src/java/org/apache/avro/ipc/ByteBufferOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/ByteBufferOutputStream.java?rev=782092&r1=782091&r2=782092&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/ByteBufferOutputStream.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/ByteBufferOutputStream.java Fri Jun  5 18:31:45 2009
@@ -24,7 +24,7 @@
 
 /** Utility to collect data written to an {@link OutputStream} in {@link
  * ByteBuffer}s.*/
-class ByteBufferOutputStream extends OutputStream {
+public class ByteBufferOutputStream extends OutputStream {
   public static final int BUFFER_SIZE = 8192;
 
   private List<ByteBuffer> buffers;

Modified: hadoop/avro/trunk/src/java/org/apache/avro/ipc/ByteBufferValueReader.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/ByteBufferValueReader.java?rev=782092&r1=782091&r2=782092&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/ByteBufferValueReader.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/ByteBufferValueReader.java Fri Jun  5 18:31:45 2009
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.avro.ipc;
-
-import java.io.*;
-import java.nio.ByteBuffer;
-import java.util.*;
-
-import org.apache.avro.io.ValueReader;
-
-/** A {@link ValueReader} that reads from {@link ByteBuffer}s.*/
-class ByteBufferValueReader extends ValueReader {
-  public ByteBufferValueReader(List<ByteBuffer> buffers) {
-    super(new ByteBufferInputStream(buffers));
-  }
-
-  @Override
-  public ByteBuffer readBuffer(Object old) throws IOException {
-    if (old != null)                              // punt
-      return super.readBuffer(old);
-    return ((ByteBufferInputStream)in).readBuffer((int)readLong());
-  }
-
-
-}

Modified: hadoop/avro/trunk/src/java/org/apache/avro/ipc/ByteBufferValueWriter.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/ByteBufferValueWriter.java?rev=782092&r1=782091&r2=782092&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/ByteBufferValueWriter.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/ByteBufferValueWriter.java Fri Jun  5 18:31:45 2009
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.avro.ipc;
-
-import java.io.*;
-import java.nio.ByteBuffer;
-import java.util.*;
-
-import org.apache.avro.io.ValueWriter;
-
-/** A {@link ValueWriter} that writes to {@link ByteBuffer}s.*/
-class ByteBufferValueWriter extends ValueWriter {
-  public ByteBufferValueWriter() {
-    super(new ByteBufferOutputStream());
-  }
-
-  @Override
-  public void writeBuffer(ByteBuffer buffer) throws IOException {
-    writeLong(buffer.remaining());
-    ((ByteBufferOutputStream)out).writeBuffer(buffer);
-  }
-
-  /** Return the list of {@link ByteBuffer}s collected. */
-  public List<ByteBuffer> getBufferList() {
-    return ((ByteBufferOutputStream)out).getBufferList();
-  }
-
-}

Modified: hadoop/avro/trunk/src/java/org/apache/avro/ipc/Requestor.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/Requestor.java?rev=782092&r1=782091&r2=782092&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/Requestor.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/Requestor.java Fri Jun  5 18:31:45 2009
@@ -57,7 +57,8 @@
     ValueReader in;
     Message m;
     do {
-      ByteBufferValueWriter out = new ByteBufferValueWriter();
+      ByteBufferOutputStream bbo = new ByteBufferOutputStream();
+      ValueWriter out = new ValueWriter(bbo);
 
       if (!established)                           // if not established
         writeHandshake(out);                      // prepend handshake
@@ -67,13 +68,14 @@
       if (m == null)
         throw new AvroRuntimeException("Not a local message: "+messageName);
       
-      out.writeUtf8(new Utf8(m.getName()));       // write message name
+      out.writeString(m.getName());       // write message name
       writeRequest(m.getRequest(), request, out); // write request payload
       
       List<ByteBuffer> response =                 // transceive
-        getTransceiver().transceive(out.getBufferList());
+        getTransceiver().transceive(bbo.getBufferList());
       
-      in = new ByteBufferValueReader(response);
+      ByteBufferInputStream bbi = new ByteBufferInputStream(response);
+      in = new ValueReader(bbi);
       if (!established)                           // if not established
         readHandshake(in);                        // process handshake
     } while (!established);

Modified: hadoop/avro/trunk/src/java/org/apache/avro/ipc/Responder.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/Responder.java?rev=782092&r1=782091&r2=782092&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/Responder.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/Responder.java Fri Jun  5 18:31:45 2009
@@ -55,16 +55,21 @@
   /** Called by a server to deserialize a request, compute and serialize
    * a response or error. */
   public List<ByteBuffer> respond(Transceiver transceiver) throws IOException {
-    ValueReader in = new ByteBufferValueReader(transceiver.readBuffers());
-    ByteBufferValueWriter out = new ByteBufferValueWriter();
+    ByteBufferInputStream bbi =
+      new ByteBufferInputStream(transceiver.readBuffers());
+    
+    ValueReader in = new ValueReader(bbi);
+    ByteBufferOutputStream bbo =
+      new ByteBufferOutputStream();
+    ValueWriter out = new ValueWriter(bbo);
     AvroRemoteException error = null;
     try {
       Protocol remote = handshake(transceiver, in, out);
       if (remote == null)                        // handshake failed
-        return out.getBufferList();
+        return bbo.getBufferList();
 
       // read request using remote protocol specification
-      String messageName = in.readUtf8(null).toString();
+      String messageName = in.readString(null).toString();
       Message m = remote.getMessages().get(messageName);
       if (m == null)
         throw new AvroRuntimeException("No such remote message: "+messageName);
@@ -94,12 +99,13 @@
     } catch (AvroRuntimeException e) {            // system error
       LOG.warn("system error", e);
       error = new AvroRemoteException(e);
-      out = new ByteBufferValueWriter();
+      bbo = new ByteBufferOutputStream();
+      out = new ValueWriter(bbo);
       out.writeBoolean(true);
       writeError(Protocol.SYSTEM_ERRORS, error, out);
     }
       
-    return out.getBufferList();
+    return bbo.getBufferList();
   }
 
   private SpecificDatumWriter handshakeWriter =

Modified: hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFile.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFile.java?rev=782092&r1=782091&r2=782092&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFile.java (original)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFile.java Fri Jun  5 18:31:45 2009
@@ -66,7 +66,7 @@
     }
   }
 
-  @Test
+  @Test(dependsOnMethods="testGenericWrite")
   public void testGenericRead() throws IOException {
     DataFileReader<Object> reader =
       new DataFileReader<Object>(new SeekableFileInput(FILE),

Modified: hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolGeneric.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolGeneric.java?rev=782092&r1=782091&r2=782092&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolGeneric.java (original)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolGeneric.java Fri Jun  5 18:31:45 2009
@@ -135,9 +135,9 @@
       new GenericData.Record(PROTOCOL.getMessages().get("echoBytes").getRequest());
     ByteBuffer data = ByteBuffer.allocate(length);
     random.nextBytes(data.array());
+    data.flip();
     params.put("data", data);
     Object echoed = requestor.request("echoBytes", params);
-    data.flip();
     assertEquals(data, echoed);
   }
 

Modified: hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolSpecific.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolSpecific.java?rev=782092&r1=782091&r2=782092&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolSpecific.java (original)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolSpecific.java Fri Jun  5 18:31:45 2009
@@ -98,8 +98,8 @@
     int length = random.nextInt(1024*16);
     ByteBuffer data = ByteBuffer.allocate(length);
     random.nextBytes(data.array());
-    ByteBuffer echoed = proxy.echoBytes(data);
     data.flip();
+    ByteBuffer echoed = proxy.echoBytes(data);
     assertEquals(data, echoed);
   }