You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by sh...@apache.org on 2009/04/15 07:24:12 UTC

svn commit: r765053 - in /hadoop/avro/trunk: ./ src/py/avro/ src/test/java/org/apache/avro/ src/test/py/

Author: sharad
Date: Wed Apr 15 05:24:09 2009
New Revision: 765053

URL: http://svn.apache.org/viewvc?rev=765053&view=rev
Log:
AVRO-5. Add java versus python RPC interoperability tests.

Added:
    hadoop/avro/trunk/src/test/py/interoptests.py
Modified:
    hadoop/avro/trunk/CHANGES.txt
    hadoop/avro/trunk/build.xml
    hadoop/avro/trunk/src/py/avro/ipc.py
    hadoop/avro/trunk/src/py/avro/schema.py
    hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFile.java
    hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolSpecific.java
    hadoop/avro/trunk/src/test/py/testio.py
    hadoop/avro/trunk/src/test/py/testioreflect.py

Modified: hadoop/avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=765053&r1=765052&r2=765053&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Wed Apr 15 05:24:09 2009
@@ -17,6 +17,9 @@
 
     AVRO-13. Use dictionary instead of if-else in validate. (sharad)
 
+    AVRO-5. Add java versus python RPC interoperability tests. 
+    (sharad)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/avro/trunk/build.xml
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/build.xml?rev=765053&r1=765052&r2=765053&view=diff
==============================================================================
--- hadoop/avro/trunk/build.xml (original)
+++ hadoop/avro/trunk/build.xml Wed Apr 15 05:24:09 2009
@@ -138,7 +138,7 @@
     </javac> 
   </target>
 
-  <target name="test" depends="test-java,test-py"/>
+  <target name="test" depends="test-java,test-py,test-interop"/>
 
   <target name="compile-test-schemata" depends="compile-java">
     <taskdef name="protocol" classname="org.apache.avro.specific.ProtocolTask">
@@ -179,7 +179,7 @@
 	       outputDirectory="${test.java.classes}"/>
   </target>
 
-  <target name="test-java" depends="compile-java-test,generate-test-data"
+  <target name="test-java" depends="compile-java-test"
 	  description="Run java unit tests">
 
     <junit showoutput="yes"
@@ -229,8 +229,7 @@
     </py-run>
   </target>
 
-  <target name="test-py" depends="generate-test-data"
-    description="Run python unit tests">
+  <target name="test-py" depends="init" description="Run python unit tests">
     <taskdef name="py-test" classname="org.pyant.tasks.PythonTestTask">
       <classpath refid="java.classpath" />
     </taskdef>
@@ -239,7 +238,103 @@
         <include name="test*.py"/>
       </fileset>
     </py-test>
-  </target>   
+  </target>
+
+  <target name="test-interop" depends="test-interop-java,test-interop-py"
+   description="Run multiple languages interoperability tests">
+  </target>
+
+  <target name="test-interop-java" 
+    depends="test-interop-data-java,test-interop-rpc-java"
+   description="Run java interoperability tests">
+  </target>
+
+  <target name="test-interop-py" 
+    depends="test-interop-data-py,test-interop-rpc-py"
+   description="Run python interoperability tests">
+  </target>
+
+  <target name="test-interop-data-java" depends="generate-test-data" 
+    description="Run java data file interoperability tests">
+    <junit showoutput="yes" 
+      printsummary="withOutAndErr"
+      haltonfailure="no"
+      errorProperty="tests.failed" failureProperty="tests.failed">
+      <sysproperty key="test.count" value="${test.count}"/>
+      <sysproperty key="test.dir" value="${test.java.build.dir}"/>
+      <sysproperty key="test.validate" value="${test.validate}"/>
+      <classpath refid="test.java.classpath"/>
+      <formatter type="plain" />
+      <test name="org.apache.avro.TestDataFile$InteropTest" 
+        todir="${test.java.build.dir}"/>
+    </junit>
+    <fail if="tests.failed">Tests failed!</fail>
+  </target>
+
+  <target name="test-interop-data-py" depends="generate-test-data" 
+    description="Run python data file interoperability tests">
+    <taskdef name="py-test" classname="org.pyant.tasks.PythonTestTask">
+      <classpath refid="java.classpath" />
+    </taskdef>
+    <py-test python="python" pythonpathref="test.py.path" >
+      <fileset dir="${basedir}/src/test/py">
+        <include name="interop*.py"/>
+      </fileset>
+    </py-test>
+  </target>
+
+  <target name="start-rpc-daemons" depends="compile-java-test"
+    description="Start the daemons for rpc interoperability tests">
+    <delete dir="${test.java.build.dir}/server-ports"/>
+    <mkdir dir="${test.java.build.dir}/server-ports"/>
+    <taskdef name="py-run" classname="org.pyant.tasks.PythonRunTask">
+      <classpath refid="java.classpath" />
+    </taskdef>
+    <!-- Start the servers. As servers block the ant main thread, these need 
+    to be created in parallel threads--> 
+    <parallel>
+      <daemons>
+        <java classname="org.apache.avro.TestProtocolSpecific$InteropTest">
+          <classpath refid="test.java.classpath"/>
+          <sysproperty key="test.dir" value="${test.java.build.dir}"/>
+        </java>
+        <py-run script="${basedir}/src/test/py/interoptests.py" 
+          python="python" pythonpathref="test.py.path">
+          <arg value="server"/>
+        </py-run>
+      </daemons>
+
+      <!-- Give some time to start -->
+      <sequential>
+        <sleep seconds="2"/>
+      </sequential>
+    </parallel>
+  </target>
+
+  <target name="test-interop-rpc-java" depends="start-rpc-daemons"
+    description="Run java rpc interoperability tests">
+    <junit showoutput="yes" 
+      printsummary="withOutAndErr"
+      haltonfailure="no"
+      errorProperty="tests.failed" failureProperty="tests.failed">
+      <sysproperty key="test.count" value="${test.count}"/>
+      <sysproperty key="test.dir" value="${test.java.build.dir}"/>
+      <sysproperty key="test.validate" value="${test.validate}"/>
+      <classpath refid="test.java.classpath"/>
+      <formatter type="plain" />
+      <test name="org.apache.avro.TestProtocolSpecific$InteropTest" 
+        todir="${test.java.build.dir}"/>
+    </junit>
+    <fail if="tests.failed">Tests failed!</fail>
+  </target>
+
+  <target name="test-interop-rpc-py" depends="start-rpc-daemons"
+    description="Run java rpc interoperability tests">
+        <py-run script="${basedir}/src/test/py/interoptests.py" 
+          python="python" pythonpathref="test.py.path">
+          <arg value="client"/>
+        </py-run>
+  </target>
 
   <target name="pydoc" description="Generate python api docs">
     <taskdef name="py-doc" classname="org.pyant.tasks.PythonDocTask">

Modified: hadoop/avro/trunk/src/py/avro/ipc.py
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/py/avro/ipc.py?rev=765053&r1=765052&r2=765053&view=diff
==============================================================================
--- hadoop/avro/trunk/src/py/avro/ipc.py (original)
+++ hadoop/avro/trunk/src/py/avro/ipc.py Wed Apr 15 05:24:09 2009
@@ -185,7 +185,8 @@
     """Writes an error message."""
     pass
 
-_STRUCT_INT = struct.Struct('i')
+_STRUCT_INT = struct.Struct('!I')
+_BUFFER_SIZE = 8192
 class SocketTransceiver(TransceiverBase):
   """A simple socket-based Transceiver implementation."""
 
@@ -193,20 +194,32 @@
     self.__sock = sock
 
   def readbuffers(self):
-    size = self.__readlength()
-    if size == 0:
-      return ''
     msg = cStringIO.StringIO()
-    while msg.tell() < size:
-      chunk = self.__sock.recv(size-msg.tell())
-      if chunk == '':
-        raise ConnectionClosedException("socket read 0 bytes")
-      msg.write(chunk)
-    if self.__readlength() != 0:
-      raise ConnectionClosedException("socket read 0 bytes")
-    return msg.getvalue()
+    while True:
+      buffer = cStringIO.StringIO()
+      size = self.__readlength()
+      if size == 0:
+        return msg.getvalue()
+      while buffer.tell() < size:
+        chunk = self.__sock.recv(size-buffer.tell())
+        if chunk == '':
+          raise ConnectionClosedException("socket read 0 bytes")
+        buffer.write(chunk)
+      msg.write(buffer.getvalue())
 
   def writebuffers(self, msg):
+    totalsize = len(msg)
+    totalsent = 0
+    while totalsize-totalsent > 0:
+      if totalsize-totalsent > _BUFFER_SIZE:
+        batchsize = _BUFFER_SIZE
+      else:
+        batchsize = totalsize-totalsent
+      self.__writebuffer(msg[totalsent:(totalsent+batchsize)])
+      totalsent = totalsent + batchsize
+    self.__writelength(0) #null terminate
+
+  def __writebuffer(self, msg):
     size = len(msg)
     self.__writelength(size)
     totalsent = 0
@@ -215,7 +228,6 @@
       if sent == 0:
         raise ConnectionClosedException("socket sent 0 bytes")
       totalsent = totalsent + sent
-    self.__writelength(0) #null terminate
 
   def __writelength(self, len):
     sent = self.__sock.sendall(_STRUCT_INT.pack(len))

Modified: hadoop/avro/trunk/src/py/avro/schema.py
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/py/avro/schema.py?rev=765053&r1=765052&r2=765053&view=diff
==============================================================================
--- hadoop/avro/trunk/src/py/avro/schema.py (original)
+++ hadoop/avro/trunk/src/py/avro/schema.py Wed Apr 15 05:24:09 2009
@@ -217,7 +217,13 @@
 
 class _Names(dict):
   def __init__(self, names=_PRIMITIVES):
-    self.update(names)
+    self.__defaults = names
+
+  def get(self, key):
+    val = dict.get(self, key)
+    if val is None:
+      val = self.__defaults.get(key)
+    return val
 
   def __setitem__(self, key, val):
     if dict.get(self, key) is not None:

Modified: hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFile.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFile.java?rev=765053&r1=765052&r2=765053&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFile.java (original)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFile.java Wed Apr 15 05:24:09 2009
@@ -85,24 +85,8 @@
     }
   }
 
-  public void testGeneratedGeneric() throws IOException {
-    readFiles(new GenericDatumReader<Object>());
-  }
-
-  public void testGeneratedSpecific() throws IOException {
-    readFiles(new SpecificDatumReader("org.apache.avro."));
-  }
-
-  public void testGeneratedReflect() throws IOException {
-    readFiles(new ReflectDatumReader("org.apache.avro."));
-  }
-
-  private void readFiles(DatumReader<Object> datumReader) throws IOException {
-    for (File f : DATAFILE_DIR.listFiles())
-      readFile(f, datumReader, true);
-  }
-
-  private void readFile(File f, DatumReader<Object> datumReader, boolean reuse)
+  protected void readFile(File f, 
+      DatumReader<Object> datumReader, boolean reuse)
     throws IOException {
     System.out.println("Reading "+ f.getName());
     DataFileReader<Object> reader =
@@ -128,4 +112,24 @@
     System.out.println("Time: "+(System.currentTimeMillis()-start));
   }
 
+  public static class InteropTest extends TestCase {
+
+    public void testGeneratedGeneric() throws IOException {
+      readFiles(new GenericDatumReader<Object>());
+    }
+
+    public void testGeneratedSpecific() throws IOException {
+      readFiles(new SpecificDatumReader("org.apache.avro."));
+    }
+
+    public void testGeneratedReflect() throws IOException {
+      readFiles(new ReflectDatumReader("org.apache.avro."));
+    }
+
+    private void readFiles(DatumReader<Object> datumReader) throws IOException {
+      TestDataFile test = new TestDataFile();
+      for (File f : DATAFILE_DIR.listFiles())
+        test.readFile(f, datumReader, true);
+    }
+  }
 }

Modified: hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolSpecific.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolSpecific.java?rev=765053&r1=765052&r2=765053&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolSpecific.java (original)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolSpecific.java Wed Apr 15 05:24:09 2009
@@ -42,6 +42,9 @@
   private static final Logger LOG
     = LoggerFactory.getLogger(TestProtocolSpecific.class);
 
+  private static final File SERVER_PORTS_DIR
+  = new File(System.getProperty("test.dir", "/tmp")+"/server-ports/");
+
   private static final File FILE = new File("src/test/schemata/test.js");
   private static final Protocol PROTOCOL;
   static {
@@ -117,4 +120,39 @@
   public void testStopServer() {
     server.close();
   }
+
+  public static class InteropTest extends TestCase{
+
+    public void testClient() throws Exception {
+      for (File f : SERVER_PORTS_DIR.listFiles()) {
+        LineNumberReader reader = new LineNumberReader(new FileReader(f));
+        int port = Integer.parseInt(reader.readLine());
+        System.out.println("Validating java client to "+
+            f.getName()+" - " + port);
+        Transceiver client = new SocketTransceiver(
+            new InetSocketAddress("localhost", port));
+        proxy = (Test)SpecificRequestor.getClient(Test.class, client);
+        TestProtocolSpecific proto = new TestProtocolSpecific();
+        proto.testHello();
+        proto.testEcho();
+        proto.testEchoBytes();
+        proto.testError();
+        System.out.println("Done! Validation java client to "+
+            f.getName()+" - " + port);
+      }
+    }
+
+    /**
+     * Starts the RPC server.
+     */
+    public static void main(String[] args) throws Exception {
+      SocketServer server = new SocketServer(
+          new SpecificResponder(Test.class, new TestImpl()),
+          new InetSocketAddress(0));
+      File portFile = new File(SERVER_PORTS_DIR, "java-port");
+      FileWriter w = new FileWriter(portFile);
+      w.write(Integer.toString(server.getPort()));
+      w.close();
+    }
+  }
 }

Added: hadoop/avro/trunk/src/test/py/interoptests.py
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/py/interoptests.py?rev=765053&view=auto
==============================================================================
--- hadoop/avro/trunk/src/test/py/interoptests.py (added)
+++ hadoop/avro/trunk/src/test/py/interoptests.py Wed Apr 15 05:24:09 2009
@@ -0,0 +1,90 @@
+#Licensed to the Apache Software Foundation (ASF) under one
+#or more contributor license agreements.  See the NOTICE file
+#distributed with this work for additional information
+#regarding copyright ownership.  The ASF licenses this file
+#to you under the Apache License, Version 2.0 (the
+#"License"); you may not use this file except in compliance
+#with the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+#Unless required by applicable law or agreed to in writing, software
+#distributed under the License is distributed on an "AS IS" BASIS,
+#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#See the License for the specific language governing permissions and
+#limitations under the License.
+
+import unittest, os, sys, socket
+import avro.schema as schema
+import avro.io as io
+import avro.ipc as ipc
+import avro.generic as generic
+import avro.reflect as reflect
+import testio, testipc, testioreflect, testipcreflect
+
+_DATAFILE_DIR = "build/test/data-files/"
+_SERVER_PORTS_DIR = testio._DIR + "server-ports/"
+
+class TestGeneratedFiles(unittest.TestCase):
+
+  def __init__(self, methodName, validator=generic.validate, 
+               datumreader=generic.DatumReader):
+    unittest.TestCase.__init__(self, methodName)
+    self.__validator = validator
+    self.__datumreader = datumreader
+
+  def testreadfiles(self):
+    origschm = schema.parse(open("src/test/schemata/interop.js").read())
+    for file in os.listdir(_DATAFILE_DIR):
+      print "Validating:", file.__str__()
+      dr = io.DataFileReader(open(_DATAFILE_DIR+file, "rb"), 
+                             self.__datumreader())
+      count = int(dr.getmeta("count"))
+      decodedSchm = schema.parse(dr.getmeta("schema"))
+      self.assertEquals(schema.stringval(origschm), 
+                        schema.stringval(decodedSchm))
+      for i in range(0,count):
+        datum = dr.next()
+        self.assertTrue(self.__validator(origschm, datum))
+
+class TestReflectGeneratedFiles(TestGeneratedFiles):
+
+  def __init__(self, methodName):
+    TestGeneratedFiles.__init__(self, methodName, 
+                                       testioreflect.dyvalidator, 
+                                       testioreflect.ReflectDReader)
+
+def _interopclient():
+  for file in os.listdir(_SERVER_PORTS_DIR):
+    port = open(_SERVER_PORTS_DIR+file).read()
+    print "Validating python client to", file, "-", port
+    sock = socket.socket()
+    sock.connect(("localhost", int(port)))
+    client = ipc.SocketTransceiver(sock)
+    testproto = testipcreflect.TestProtocol("testipc")
+    testproto.proxy = reflect.getclient(testipc.PROTOCOL, client)
+    testproto.checkhello()
+    testproto.checkecho()
+    testproto.checkechobytes()
+    testproto.checkerror()
+    print "Done! Validation python client to", file, "-", port
+
+def _interopserver():
+  addr = ('localhost', 0)
+  responder = reflect.ReflectResponder(testipc.PROTOCOL, 
+                                       testipcreflect.TestImpl())
+  server = ipc.SocketServer(responder, addr)
+  file = open(_SERVER_PORTS_DIR+"py-port", "w")
+  file.write(str(server.getaddress()[1]))
+
+if __name__ == '__main__':
+  usage = "Usage: testipcrefect.py <client/server>"
+  if len(sys.argv) != 2:
+    print usage
+    exit
+  if sys.argv[1] == "client":
+    _interopclient()
+  elif sys.argv[1] == "server":
+    _interopserver()
+  else:
+    print usage
\ No newline at end of file

Modified: hadoop/avro/trunk/src/test/py/testio.py
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/py/testio.py?rev=765053&r1=765052&r2=765053&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/py/testio.py (original)
+++ hadoop/avro/trunk/src/test/py/testio.py Wed Apr 15 05:24:09 2009
@@ -182,31 +182,9 @@
       if self.__assertdata:
         self.assertEquals(data, dr.next())
 
-class TestGeneratedFiles(unittest.TestCase):
-
-  def __init__(self, methodName, validator=generic.validate, 
-               datumreader=generic.DatumReader):
-    unittest.TestCase.__init__(self, methodName)
-    self.__validator = validator
-    self.__datumreader = datumreader
-
-  def testreadfiles(self):
-    origschm = schema.parse(open("src/test/schemata/interop.js").read())
-    dir = _DIR + "data-files/"
-    for file in os.listdir(dir):
-      print "Validating:", file.__str__()
-      dr = io.DataFileReader(open(dir+file, "rb"), self.__datumreader())
-      count = int(dr.getmeta("count"))
-      decodedSchm = schema.parse(dr.getmeta("schema"))
-      self.assertEquals(schema.stringval(origschm), 
-                        schema.stringval(decodedSchm))
-      for i in range(0,count):
-        datum = dr.next()
-        self.assertTrue(self.__validator(origschm, datum))
-
 if __name__ == '__main__':
   if len(sys.argv) != 4:
-    print "Usage: testavro.py <schemafile> <outputfile> <count>"
+    print "Usage: testio.py <schemafile> <outputfile> <count>"
     exit
   schm = schema.parse(open(sys.argv[1]).read())
   file = sys.argv[2]

Modified: hadoop/avro/trunk/src/test/py/testioreflect.py
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/py/testioreflect.py?rev=765053&r1=765052&r2=765053&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/py/testioreflect.py (original)
+++ hadoop/avro/trunk/src/test/py/testioreflect.py Wed Apr 15 05:24:09 2009
@@ -59,8 +59,3 @@
     testio.TestSchema.__init__(self, methodName, dyvalidator, ReflectDWriter,
                                ReflectDReader, DyRandomData, False)
 
-class TestGeneratedFiles(testio.TestGeneratedFiles):
-
-  def __init__(self, methodName):
-    testio.TestGeneratedFiles.__init__(self, methodName, 
-                                       dyvalidator, ReflectDReader)