You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2011/09/12 22:24:05 UTC

svn commit: r1169908 - in /avro/trunk: ./ lang/java/ lang/java/avro/src/main/java/org/apache/avro/generic/ lang/java/avro/src/main/java/org/apache/avro/reflect/ lang/java/avro/src/main/java/org/apache/avro/specific/ lang/java/protobuf/ lang/java/protob...

Author: cutting
Date: Mon Sep 12 20:24:04 2011
New Revision: 1169908

URL: http://svn.apache.org/viewvc?rev=1169908&view=rev
Log:
AVRO-805: Java: Add support for reading and writing instances of Protocol Buffer (protobuf) generated classes.

Added:
    avro/trunk/lang/java/protobuf/   (with props)
    avro/trunk/lang/java/protobuf/pom.xml
    avro/trunk/lang/java/protobuf/src/
    avro/trunk/lang/java/protobuf/src/main/
    avro/trunk/lang/java/protobuf/src/main/java/
    avro/trunk/lang/java/protobuf/src/main/java/org/
    avro/trunk/lang/java/protobuf/src/main/java/org/apache/
    avro/trunk/lang/java/protobuf/src/main/java/org/apache/avro/
    avro/trunk/lang/java/protobuf/src/main/java/org/apache/avro/protobuf/
    avro/trunk/lang/java/protobuf/src/main/java/org/apache/avro/protobuf/ProtobufData.java
    avro/trunk/lang/java/protobuf/src/main/java/org/apache/avro/protobuf/ProtobufDatumReader.java
    avro/trunk/lang/java/protobuf/src/main/java/org/apache/avro/protobuf/ProtobufDatumWriter.java
    avro/trunk/lang/java/protobuf/src/main/java/org/apache/avro/protobuf/package.html
    avro/trunk/lang/java/protobuf/src/test/
    avro/trunk/lang/java/protobuf/src/test/java/
    avro/trunk/lang/java/protobuf/src/test/java/org/
    avro/trunk/lang/java/protobuf/src/test/java/org/apache/
    avro/trunk/lang/java/protobuf/src/test/java/org/apache/avro/
    avro/trunk/lang/java/protobuf/src/test/java/org/apache/avro/protobuf/
    avro/trunk/lang/java/protobuf/src/test/java/org/apache/avro/protobuf/TestProtobuf.java
    avro/trunk/lang/java/protobuf/src/test/protobuf/
    avro/trunk/lang/java/protobuf/src/test/protobuf/test.proto
Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificData.java
    avro/trunk/lang/java/pom.xml

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1169908&r1=1169907&r2=1169908&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Mon Sep 12 20:24:04 2011
@@ -12,6 +12,11 @@ Avro 1.6.0 (unreleased)
     deprecated and may be removed in a future release.  (James
     Baldassari via cutting)
 
+    AVRO-805: Java: Add support for reading and writing instances of
+    Protocol Buffer (protobuf) generated classes.  This permits
+    protobuf-defined data structures to be written and read from
+    Avro-format data files.  (cutting)
+
   OPTIMIZATIONS
 
     AVRO-853: Java: Cache Schema hash codes. (cutting)

Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java?rev=1169908&r1=1169907&r2=1169908&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java (original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java Mon Sep 12 20:24:04 2011
@@ -301,10 +301,9 @@ public class GenericData {
   public boolean validate(Schema schema, Object datum) {
     switch (schema.getType()) {
     case RECORD:
-      if (!(datum instanceof IndexedRecord)) return false;
-      IndexedRecord fields = (IndexedRecord)datum;
+      if (!isRecord(datum)) return false;
       for (Field f : schema.getFields()) {
-        if (!validate(f.schema(), fields.get(f.pos())))
+        if (!validate(f.schema(), getField(datum, f.name(), f.pos())))
           return false;
       }
       return true;
@@ -352,15 +351,15 @@ public class GenericData {
   }
   /** Renders a Java datum as <a href="http://www.json.org/">JSON</a>. */
   protected void toString(Object datum, StringBuilder buffer) {
-    if (datum instanceof IndexedRecord) {
+    if (isRecord(datum)) {
       buffer.append("{");
       int count = 0;
-      IndexedRecord record = (IndexedRecord)datum;
-      for (Field f : record.getSchema().getFields()) {
+      Schema schema = getRecordSchema(datum);
+      for (Field f : schema.getFields()) {
         toString(f.name(), buffer);
         buffer.append(": ");
-        toString(record.get(f.pos()), buffer);
-        if (++count < record.getSchema().getFields().size())
+        toString(getField(datum, f.name(), f.pos()), buffer);
+        if (++count < schema.getFields().size())
           buffer.append(", ");
       }
       buffer.append("}");
@@ -450,8 +449,8 @@ public class GenericData {
 
   /** Create a schema given an example datum. */
   public Schema induce(Object datum) {
-    if (datum instanceof IndexedRecord) {
-      return ((IndexedRecord)datum).getSchema();
+    if (isRecord(datum)) {
+      return getRecordSchema(datum);
     } else if (datum instanceof Collection) {
       Schema elementType = null;
       for (Object element : (Collection<?>)datum) {
@@ -615,11 +614,11 @@ public class GenericData {
     int hashCode = 1;
     switch (s.getType()) {
     case RECORD:
-      IndexedRecord r = (IndexedRecord)o;
       for (Field f : s.getFields()) {
         if (f.order() == Field.Order.IGNORE)
           continue;
-        hashCode = hashCodeAdd(hashCode, r.get(f.pos()), f.schema());
+        hashCode = hashCodeAdd(hashCode,
+                               getField(o, f.name(), f.pos()), f.schema());
       }
       return hashCode;
     case ARRAY:

Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java?rev=1169908&r1=1169907&r2=1169908&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java (original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java Mon Sep 12 20:24:04 2011
@@ -130,34 +130,15 @@ public class ReflectData extends Specifi
   @Override
   public boolean validate(Schema schema, Object datum) {
     switch (schema.getType()) {
-    case RECORD:
-      if (datum == null) return false;
-      Class c = datum.getClass(); 
-      for (Schema.Field f : schema.getFields()) {
-        try {
-          if (!validate(f.schema(),
-                        getField(c, f.name()).get(datum)))
+    case ARRAY:
+      if (!datum.getClass().isArray())
+        return super.validate(schema, datum);
+      int length = java.lang.reflect.Array.getLength(datum);
+      for (int i = 0; i < length; i++)
+        if (!validate(schema.getElementType(),
+                      java.lang.reflect.Array.get(datum, i)))
           return false;
-        } catch (IllegalAccessException e) {
-          throw new AvroRuntimeException(e);
-        }
-      }
       return true;
-    case ARRAY:
-      if (datum instanceof Collection) {          // collection
-        for (Object element : (Collection)datum)
-          if (!validate(schema.getElementType(), element))
-            return false;
-        return true;
-      } else if (datum.getClass().isArray()) {    // array
-        int length = java.lang.reflect.Array.getLength(datum);
-        for (int i = 0; i < length; i++)
-          if (!validate(schema.getElementType(),
-                        java.lang.reflect.Array.get(datum, i)))
-            return false;
-        return true;
-      }
-      return false;
     default:
       return super.validate(schema, datum);
     }

Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificData.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificData.java?rev=1169908&r1=1169907&r2=1169908&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificData.java (original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificData.java Mon Sep 12 20:24:04 2011
@@ -112,7 +112,7 @@ public class SpecificData extends Generi
   }
 
   /** Returns the Java class name indicated by a schema's name and namespace. */
-  public String getClassName(Schema schema) {
+  public static String getClassName(Schema schema) {
     String namespace = schema.getNamespace();
     String name = schema.getName();
     if (namespace == null || "".equals(namespace))

Modified: avro/trunk/lang/java/pom.xml
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/pom.xml?rev=1169908&r1=1169907&r2=1169908&view=diff
==============================================================================
--- avro/trunk/lang/java/pom.xml (original)
+++ avro/trunk/lang/java/pom.xml Mon Sep 12 20:24:04 2011
@@ -46,6 +46,7 @@
     <netty-version>3.2.4.Final</netty-version> 
     <jopt-simple-version>3.2</jopt-simple-version>
     <snappy-version>1.0.3.2</snappy-version>
+    <protobuf-version>2.4.1</protobuf-version>
   </properties>
 
   <build>
@@ -402,6 +403,7 @@
     <module>ipc</module>
     <module>tools</module>
     <module>mapred</module>
+    <module>protobuf</module>
   </modules>
 
 </project>

Propchange: avro/trunk/lang/java/protobuf/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Mon Sep 12 20:24:04 2011
@@ -0,0 +1 @@
+target

Added: avro/trunk/lang/java/protobuf/pom.xml
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/protobuf/pom.xml?rev=1169908&view=auto
==============================================================================
--- avro/trunk/lang/java/protobuf/pom.xml (added)
+++ avro/trunk/lang/java/protobuf/pom.xml Mon Sep 12 20:24:04 2011
@@ -0,0 +1,73 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<project
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+  xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>avro-parent</artifactId>
+    <groupId>org.apache.avro</groupId>
+    <version>1.6.0-SNAPSHOT</version>
+    <relativePath>../</relativePath>
+  </parent>
+  <artifactId>avro-protobuf</artifactId>
+  <name>Apache Avro Protobuf Compatibility</name>
+  <description>Permit serialization of Protobuf-generated classes as
+  Avro data.</description>
+
+  <dependencies>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>avro</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+      <version>${protobuf-version}</version>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <artifactId>maven-antrun-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>generate-test-sources</id>
+            <phase>generate-sources</phase>
+            <configuration>
+              <tasks>
+                <mkdir dir="target/generated-test-sources"/>
+                <exec executable="protoc">
+                  <arg value="--java_out=target/generated-test-sources"/>
+                  <arg value="src/test/protobuf/test.proto"/>
+                </exec>
+              </tasks>
+              <sourceRoot>target/generated-test-sources</sourceRoot>
+            </configuration>
+            <goals>
+              <goal>run</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>

Added: avro/trunk/lang/java/protobuf/src/main/java/org/apache/avro/protobuf/ProtobufData.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/protobuf/src/main/java/org/apache/avro/protobuf/ProtobufData.java?rev=1169908&view=auto
==============================================================================
--- avro/trunk/lang/java/protobuf/src/main/java/org/apache/avro/protobuf/ProtobufData.java (added)
+++ avro/trunk/lang/java/protobuf/src/main/java/org/apache/avro/protobuf/ProtobufData.java Mon Sep 12 20:24:04 2011
@@ -0,0 +1,310 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.protobuf;
+
+import java.util.List;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.IdentityHashMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+import java.io.IOException;
+import java.io.File;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.specific.SpecificData;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Message.Builder;
+import com.google.protobuf.MessageOrBuilder;
+import com.google.protobuf.ProtocolMessageEnum;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.Descriptors.EnumDescriptor;
+import com.google.protobuf.Descriptors.EnumValueDescriptor;
+import com.google.protobuf.Descriptors.FileDescriptor;
+import com.google.protobuf.DescriptorProtos.FileOptions;
+
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.node.JsonNodeFactory;
+
+/** Utilities for serializing Protobuf data in Avro format. */
+public class ProtobufData extends GenericData {
+  private static final String PROTOBUF_TYPE = "protobuf";
+
+  private static final ProtobufData INSTANCE = new ProtobufData();
+
+  protected ProtobufData() {}
+  
+  /** Return the singleton instance. */
+  public static ProtobufData get() { return INSTANCE; }
+
+  @Override
+  public void setField(Object record, String name, int position, Object o) {
+    Builder b = (Builder)record;
+    FieldDescriptor f = getFieldDescriptor(b.getDescriptorForType(), position);
+    switch (f.getType()) {
+    case ENUM:
+      b.setField(f, ((ProtocolMessageEnum)o).getValueDescriptor());
+      break;
+    case MESSAGE:
+      if (o == null) {
+        b.clearField(f);
+        break;
+      }
+    default:
+      b.setField(f, o);
+    }
+  }
+
+  @Override
+  public Object getField(Object record, String name, int position) {
+    MessageOrBuilder b = (MessageOrBuilder)record;
+    FieldDescriptor f = getFieldDescriptor(b.getDescriptorForType(), position);
+    switch (f.getType()) {
+    case ENUM:
+      Schema s = getSchema(f);
+      try {
+        Class c = Class.forName(SpecificData.getClassName(s));
+        EnumValueDescriptor symbol = (EnumValueDescriptor)b.getField(f);
+        return Enum.valueOf(c, symbol.getName());
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    case MESSAGE:
+      if (!b.hasField(f))
+        return null;
+    default:
+      return b.getField(f);
+    }
+  }    
+
+  private final Map<Descriptor,FieldDescriptor[]> fieldCache =
+    new ConcurrentHashMap<Descriptor,FieldDescriptor[]>();
+
+  private FieldDescriptor getFieldDescriptor(Descriptor d, int pos) {
+    FieldDescriptor[] fields = fieldCache.get(d);
+    if (fields == null) {                         // cache miss
+      Schema s = getSchema(d);
+      fields = new FieldDescriptor[s.getFields().size()];
+      for (Field f : s.getFields())
+        fields[f.pos()] = d.findFieldByName(f.name());
+      fieldCache.put(d, fields);                  // update cache
+    }
+    return fields[pos];
+  }
+
+  @Override
+  protected boolean isRecord(Object datum) {
+    return datum instanceof MessageOrBuilder;
+  }
+
+  @Override
+  public Object newRecord(Object old, Schema schema) {
+    try {
+      Class c = Class.forName(SpecificData.getClassName(schema));
+      if (c == null)
+        return newRecord(old, schema);            // punt to generic
+      if (c.isInstance(old))
+        return old;                               // reuse instance
+      return c.getMethod("newBuilder").invoke(null);
+
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  protected boolean isArray(Object datum) {
+    return datum instanceof List;
+  }
+
+  @Override
+  protected boolean isBytes(Object datum) {
+    return datum instanceof ByteString;
+  }
+
+  @Override
+  protected Schema getRecordSchema(Object record) {
+    return getSchema(((MessageOrBuilder)record).getDescriptorForType());
+  }
+
+  private final Map<Class,Schema> schemaCache
+    = new ConcurrentHashMap<Class,Schema>();
+
+  /** Return a record schema given a protobuf message class. */
+  public Schema getSchema(Class c) {
+    Schema schema = schemaCache.get(c);
+
+    if (schema == null) {                         // cache miss
+      try {
+        schema=getSchema((Descriptor)c.getMethod("getDescriptor").invoke(null));
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+      schemaCache.put(c, schema);                 // update cache
+    }
+    return schema;
+  }
+
+  private static final ThreadLocal<Map<Descriptor,Schema>> SEEN
+    = new ThreadLocal<Map<Descriptor,Schema>>() {
+    protected Map<Descriptor,Schema> initialValue() {
+      return new IdentityHashMap<Descriptor,Schema>();
+    }
+  };
+
+  private Schema getSchema(Descriptor descriptor) {
+    Map<Descriptor,Schema> seen = SEEN.get();
+    if (seen.containsKey(descriptor))             // stop recursion
+      return seen.get(descriptor);
+    boolean first = seen.isEmpty();
+    try {
+      Schema result =
+        Schema.createRecord(descriptor.getName(), null,
+                            getNamespace(descriptor.getFile()), false);
+
+      seen.put(descriptor, result);
+        
+      List<Field> fields = new ArrayList<Field>();
+      for (FieldDescriptor f : descriptor.getFields()) {
+        Schema s = getSchema(f);
+        if (f.isRepeated())
+          s = Schema.createArray(s);
+        fields.add(new Field(f.getName(), s, null, getDefault(f)));
+      }
+      result.setFields(fields);
+      return result;
+
+    } finally {
+      if (first)
+        seen.clear();
+    }
+  }
+
+  private String getNamespace(FileDescriptor fd) {
+    FileOptions o = fd.getOptions();
+    String p = o.hasJavaPackage()
+      ? o.getJavaPackage()
+      : fd.getPackage();
+    String outer;
+    if (o.hasJavaOuterClassname()) {
+      outer = o.getJavaOuterClassname();
+    } else {
+      outer = new File(fd.getName()).getName();
+      outer = outer.substring(0, outer.lastIndexOf('.'));
+      outer = toCamelCase(outer);
+    }
+    return p + "." + outer + "$";
+  }
+
+  private static String toCamelCase(String s){
+    String[] parts = s.split("_");
+    String camelCaseString = "";
+    for (String part : parts) {
+      camelCaseString = camelCaseString + cap(part);
+    }
+    return camelCaseString;
+  }
+
+  private static String cap(String s) {
+    return s.substring(0, 1).toUpperCase() + s.substring(1).toLowerCase();
+  }
+
+  private static final Schema NULL = Schema.create(Schema.Type.NULL);
+
+  private Schema getSchema(FieldDescriptor f) {
+    Schema result;
+    switch (f.getType()) {
+    case BOOL:
+      return Schema.create(Schema.Type.BOOLEAN);
+    case FLOAT:
+      return Schema.create(Schema.Type.FLOAT);
+    case DOUBLE:
+      return Schema.create(Schema.Type.DOUBLE);
+    case STRING:
+      return Schema.create(Schema.Type.STRING);
+    case BYTES:
+      return Schema.create(Schema.Type.BYTES);
+    case INT32: case UINT32: case SINT32: case FIXED32: case SFIXED32:
+      return Schema.create(Schema.Type.INT);
+    case INT64: case UINT64: case SINT64: case FIXED64: case SFIXED64:
+      return Schema.create(Schema.Type.LONG);
+    case ENUM:
+      EnumDescriptor d = f.getEnumType();
+      List<String> symbols = new ArrayList<String>();
+      for (EnumValueDescriptor e : d.getValues()) {
+        symbols.add(e.getName());
+      }
+      return Schema.createEnum(d.getName(), null, getNamespace(d.getFile()),
+                               symbols);
+    case MESSAGE:
+      result = getSchema(f.getMessageType());
+      if (f.isOptional())
+        // wrap optional record fields in a union with null
+        result = Schema.createUnion(Arrays.asList(new Schema[] {NULL, result}));
+      return result;
+    case GROUP:                                   // groups are deprecated
+    default:
+      throw new RuntimeException("Unexpected type: "+f.getType());
+    }
+  }
+
+  private static final JsonFactory FACTORY = new JsonFactory();
+  private static final ObjectMapper MAPPER = new ObjectMapper(FACTORY);
+  private static final JsonNodeFactory NODES = JsonNodeFactory.instance;
+
+  private JsonNode getDefault(FieldDescriptor f) {
+    if (f.isRequired() || f.isRepeated())         // no default
+      return null;
+
+    if (f.hasDefaultValue()) {                    // parse spec'd default value
+      String json = toString(f.getDefaultValue());
+      try {
+        return MAPPER.readTree(FACTORY.createJsonParser(json));
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    switch (f.getType()) {                        // generate default for type
+    case BOOL:
+      return NODES.booleanNode(false);
+    case FLOAT: case DOUBLE:
+    case INT32: case UINT32: case SINT32: case FIXED32: case SFIXED32:
+    case INT64: case UINT64: case SINT64: case FIXED64: case SFIXED64:
+      return NODES.numberNode(0);
+    case STRING: case BYTES:
+      return NODES.textNode("");
+    case ENUM:
+      return NODES.textNode(f.getEnumType().getValues().get(0).getName());
+    case MESSAGE:
+      return NODES.nullNode();
+    case GROUP:                                   // groups are deprecated
+    default:
+      throw new RuntimeException("Unexpected type: "+f.getType());
+    }
+    
+  }
+
+}

Added: avro/trunk/lang/java/protobuf/src/main/java/org/apache/avro/protobuf/ProtobufDatumReader.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/protobuf/src/main/java/org/apache/avro/protobuf/ProtobufDatumReader.java?rev=1169908&view=auto
==============================================================================
--- avro/trunk/lang/java/protobuf/src/main/java/org/apache/avro/protobuf/ProtobufDatumReader.java (added)
+++ avro/trunk/lang/java/protobuf/src/main/java/org/apache/avro/protobuf/ProtobufDatumReader.java Mon Sep 12 20:24:04 2011
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.protobuf;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.ResolvingDecoder;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Message;
+
+/** {@link org.apache.avro.io.DatumReader DatumReader} for generated Protobuf
+ * classes. */
+public class ProtobufDatumReader<T> extends GenericDatumReader<T> {
+  public ProtobufDatumReader() {
+    this(null, null, ProtobufData.get());
+  }
+
+  public ProtobufDatumReader(Class<T> c) {
+    this(ProtobufData.get().getSchema(c));
+  }
+
+  /** Construct where the writer's and reader's schemas are the same. */
+  public ProtobufDatumReader(Schema schema) {
+    this(schema, schema, ProtobufData.get());
+  }
+
+  /** Construct given writer's and reader's schema. */
+  public ProtobufDatumReader(Schema writer, Schema reader) {
+    this(writer, reader, ProtobufData.get());
+  }
+
+  protected ProtobufDatumReader(Schema writer, Schema reader,
+                                ProtobufData data) {
+    super(writer, reader, data);
+  }
+
+  @Override
+  protected Object readRecord(Object old, Schema expected, 
+                              ResolvingDecoder in) throws IOException {
+    Message.Builder b = (Message.Builder)super.readRecord(old, expected, in);
+    return b.build();                             // build instance
+  }
+
+  @Override
+  protected Object createEnum(String symbol, Schema schema) {
+    try {
+      Class c = Class.forName(SpecificData.getClassName(schema));
+      if (c == null) return super.createEnum(symbol, schema); // punt to generic
+      return Enum.valueOf(c, symbol);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  protected Object readBytes(Object old, Decoder in) throws IOException {
+    return ByteString.copyFrom((ByteBuffer)super.readBytes(old, in));
+  }    
+
+  @Override
+  protected Object readString(Object old, Decoder in) throws IOException {
+    return super.readString(old, in).toString();
+  }    
+
+}
+

Added: avro/trunk/lang/java/protobuf/src/main/java/org/apache/avro/protobuf/ProtobufDatumWriter.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/protobuf/src/main/java/org/apache/avro/protobuf/ProtobufDatumWriter.java?rev=1169908&view=auto
==============================================================================
--- avro/trunk/lang/java/protobuf/src/main/java/org/apache/avro/protobuf/ProtobufDatumWriter.java (added)
+++ avro/trunk/lang/java/protobuf/src/main/java/org/apache/avro/protobuf/ProtobufDatumWriter.java Mon Sep 12 20:24:04 2011
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.protobuf;
+
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.Encoder;
+
+import com.google.protobuf.ByteString;
+
+/** {@link org.apache.avro.io.DatumWriter DatumWriter} for generated protobuf
+ * classes. */
+public class ProtobufDatumWriter<T> extends GenericDatumWriter<T> {
+  public ProtobufDatumWriter() {
+    super(ProtobufData.get());
+  }
+
+  public ProtobufDatumWriter(Class<T> c) {
+    super(ProtobufData.get().getSchema(c), ProtobufData.get());
+  }
+  
+  public ProtobufDatumWriter(Schema schema) {
+    super(schema, ProtobufData.get());
+  }
+  
+  protected ProtobufDatumWriter(Schema root, ProtobufData protobufData) {
+    super(root, protobufData);
+  }
+  
+  protected ProtobufDatumWriter(ProtobufData protobufData) {
+    super(protobufData);
+  }
+  
+  @Override
+  protected void writeEnum(Schema schema, Object datum, Encoder out)
+    throws IOException {
+    if (!(datum instanceof Enum))
+      super.writeEnum(schema, datum, out);        // punt to generic
+    else
+      out.writeEnum(((Enum)datum).ordinal());
+  }
+
+  @Override
+  protected void writeBytes(Object datum, Encoder out) throws IOException {
+    ByteString bytes = (ByteString)datum; 
+    out.writeBytes(bytes.toByteArray(), 0, bytes.size());
+  }
+
+}
+

Added: avro/trunk/lang/java/protobuf/src/main/java/org/apache/avro/protobuf/package.html
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/protobuf/src/main/java/org/apache/avro/protobuf/package.html?rev=1169908&view=auto
==============================================================================
--- avro/trunk/lang/java/protobuf/src/main/java/org/apache/avro/protobuf/package.html (added)
+++ avro/trunk/lang/java/protobuf/src/main/java/org/apache/avro/protobuf/package.html Mon Sep 12 20:24:04 2011
@@ -0,0 +1,42 @@
+<html>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<body><a href="http://code.google.com/p/protobuf/">Protocol Buffer</a>
+  compatibility.
+
+<p>Protocol Buffer primitive types are mapped to Avro types as follows:
+<table>
+<tr><th>protobuf type</th><th>Avro type</th></tr>
+<tr><td>int32, uint32, sint32, fixed32, sfixed32</td><td>int</td></tr>
+<tr><td>int64, uint64, sint64, fixed64, sfixed64</td><td>long</td></tr>
+<tr><td>float</td><td>float</td></tr>
+<tr><td>double</td><td>double</td></tr>
+<tr><td>bool</td><td>boolean</td></tr>
+<tr><td>string</td><td>string</td></tr>
+<tr><td>bytes</td><td>bytes</td></tr>
+<tr><td>enum</td><td>enum</td></tr>
+<tr><td>message</td><td>record</td></tr>
+</table>
+<p>Notes:
+<ul>
+<li>protobuf repeated fields are represented as Avro arrays
+<li>protobuf default values are translated to Avro default values
+</ul>
+</body>
+</html>

Added: avro/trunk/lang/java/protobuf/src/test/java/org/apache/avro/protobuf/TestProtobuf.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/protobuf/src/test/java/org/apache/avro/protobuf/TestProtobuf.java?rev=1169908&view=auto
==============================================================================
--- avro/trunk/lang/java/protobuf/src/test/java/org/apache/avro/protobuf/TestProtobuf.java (added)
+++ avro/trunk/lang/java/protobuf/src/test/java/org/apache/avro/protobuf/TestProtobuf.java Mon Sep 12 20:24:04 2011
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.protobuf;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+
+import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+
+import com.google.protobuf.ByteString;
+
+import pb.Test.Foo;
+import pb.Test.A;
+
+public class TestProtobuf {
+  @Test public void testMessage() throws Exception {
+
+    System.out.println(ProtobufData.get().getSchema(Foo.class).toString(true));
+    Foo.Builder builder = Foo.newBuilder();
+    builder.setInt32(0);
+    builder.setInt64(2);
+    builder.setUint32(3);
+    builder.setUint64(4);
+    builder.setSint32(5);
+    builder.setSint64(6);
+    builder.setFixed32(7);
+    builder.setFixed64(8);
+    builder.setSfixed32(9);
+    builder.setSfixed64(10);
+    builder.setFloat(1.0F);
+    builder.setDouble(2.0);
+    builder.setBool(true);
+    builder.setString("foo");
+    builder.setBytes(ByteString.copyFromUtf8("bar"));
+    builder.setEnum(A.X);
+    builder.addIntArray(27);
+    Foo fooInner = builder.build();
+    builder.setFoo(fooInner);
+    Foo foo = builder.build();
+
+    System.out.println(foo);
+
+    ByteArrayOutputStream bao = new ByteArrayOutputStream();
+    ProtobufDatumWriter<Foo> w = new ProtobufDatumWriter<Foo>(Foo.class);
+    Encoder e = EncoderFactory.get().binaryEncoder(bao, null);
+    w.write(foo, e);
+    e.flush();
+    
+    Object o = new ProtobufDatumReader<Foo>(Foo.class).read
+      (null,
+       DecoderFactory.get().createBinaryDecoder
+       (new ByteArrayInputStream(bao.toByteArray()), null));
+
+    assertEquals(foo, o);
+
+  }
+}

Added: avro/trunk/lang/java/protobuf/src/test/protobuf/test.proto
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/protobuf/src/test/protobuf/test.proto?rev=1169908&view=auto
==============================================================================
--- avro/trunk/lang/java/protobuf/src/test/protobuf/test.proto (added)
+++ avro/trunk/lang/java/protobuf/src/test/protobuf/test.proto Mon Sep 12 20:24:04 2011
@@ -0,0 +1,35 @@
+package pb;
+
+message Foo {
+  // all the primitive types
+  required    int32 int32    =  1;
+  optional    int64 int64    =  2;
+  optional   uint32 uint32   =  3;
+  optional   uint64 uint64   =  4;
+  optional   sint32 sint32   =  5;
+  optional   sint64 sint64   =  6;
+  optional  fixed32 fixed32  =  7;
+  optional  fixed64 fixed64  =  8;
+  optional sfixed32 sfixed32 =  9;
+  optional sfixed64 sfixed64 = 10;
+  optional    float float    = 11;
+  optional   double double   = 12;
+  optional     bool bool     = 13;
+  optional   string string   = 14;
+  optional    bytes bytes    = 15;
+  optional        A enum     = 16;
+
+  // a repeated type
+  repeated    int32 intArray = 17;
+
+  // a recursive type
+  optional     Foo  foo      = 18;
+
+}
+
+// an enum
+enum A {
+  X = 1;
+  Y = 2;
+  Z = 3;
+}