You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2013/09/23 17:56:17 UTC

svn commit: r1525632 [3/3] - in /pig/trunk: ./ src/org/apache/pig/impl/builtin/ src/org/apache/pig/impl/streaming/ src/org/apache/pig/impl/util/ src/org/apache/pig/scripting/ src/org/apache/pig/scripting/streaming/ src/org/apache/pig/scripting/streamin...

Added: pig/trunk/test/python/streaming/test_controller.py
URL: http://svn.apache.org/viewvc/pig/trunk/test/python/streaming/test_controller.py?rev=1525632&view=auto
==============================================================================
--- pig/trunk/test/python/streaming/test_controller.py (added)
+++ pig/trunk/test/python/streaming/test_controller.py Mon Sep 23 15:56:16 2013
@@ -0,0 +1,314 @@
+import unittest
+import controller
+import StringIO
+import sys
+
+from datetime import datetime
+
+class TestDeserializer( unittest.TestCase ):
+    def test__no_params(self):
+        input = ""
+        expected_output = []
+        out = controller.deserialize_input(input)
+        self.assertEquals(expected_output, out)
+
+    def test__chararray(self):
+        input = "C1234"
+        expected_output = ["1234"]
+        out = controller.deserialize_input(input)
+        self.assertEquals(expected_output, out)
+
+    def test__empty_chararray(self):
+        input = "C"
+        expected_output = [""]
+        out = controller.deserialize_input(input)
+        self.assertEquals(expected_output, out)
+
+    def test__null_chararray(self):
+        input = "|-_"
+        expected_output = [None]
+        out = controller.deserialize_input(input)
+        self.assertEquals(expected_output, out)
+
+    def test__boolean(self):
+        input = "Btrue"
+        expected_output = [True]
+        out = controller.deserialize_input(input)
+        self.assertEquals(expected_output, out)
+
+    def test__datetime_dateutil(self):
+        controller.USE_DATEUTIL = True
+        try:
+            from dateutil.tz import *
+            from dateutil import parser
+            input = "T2008-09-03T20:56:35.450+00:00"
+            expected_output = [ datetime(2008, 9, 3, 20, 56, 35, 450000, tzinfo=tzutc()) ]
+            out = controller.deserialize_input(input)
+            self.assertEquals(expected_output, out)
+        except ImportError:
+            pass
+
+    def test__datetime_datetime(self):
+        controller.USE_DATEUTIL = False
+
+        input = "T2008-09-03T20:56:36.444+00:00"
+        expected_output = [ datetime(2008, 9, 3, 20, 56, 36, 444000) ]
+        out = controller.deserialize_input(input)
+        self.assertEquals(expected_output, out)
+
+        controller.USE_DATEUTIL = True
+
+
+    def test__two_elements(self):
+        input = "C032550737A79C543|\t_I970916083725"
+        expected_output = [ "032550737A79C543", 970916083725 ]
+        out = controller.deserialize_input(input)
+        self.assertEquals(expected_output, out)
+
+    def test__three_elements_one_null(self):
+        input = "C032550737A79C543|\t_I970916083725|\t_|-_"
+        expected_output = [ "032550737A79C543", 970916083725, None ]
+        out = controller.deserialize_input(input)
+        self.assertEquals(expected_output, out)
+
+    def test__bag(self):
+        input = "|{_|(_C79C543|,_I9709|,_Crichard keith|)_|,_|(_C79C543|,_I97091|,_Cmicrosoft works|)_|,_|(_C79C543|,_I970|,_Csearch engines|)_|}_"
+        expected_output = [ [ ("79C543", 9709, "richard keith"),
+                              ("79C543", 97091, "microsoft works"),
+                              ("79C543", 970, "search engines") ] ]
+        out = controller.deserialize_input(input)
+        self.assertEquals(expected_output, out)
+
+    def test__two_elements_bags(self):
+        input = "|{_|(_C79C543|,_D97091608|,_Crichard keith frazine|)_|,_|(_C79C543|,_D97091609|,_Cmicrosoft works|)_|}_|\t_|{_|(_C79C543|,_D97091608|)_|,_|(_C79C543|,_D9709160|)_|}_"
+        expected_output =  [[("79C543",float(97091608),"richard keith frazine"),
+                             ("79C543", float(97091609), "microsoft works") ],
+                            [("79C543", float(97091608)),
+                             ("79C543", float(9709160))]]
+        out = controller.deserialize_input(input)
+        self.assertEquals(expected_output, out)
+
+    def test__atomic_mixed_with_complex(self):
+        input = "|{_|(_C79C543|,_L970916083725|,_Crichard keith frazine|)_|,_|(_C79C543|,_L970916095254|,_Cmicrosoft works|)_|}_|\t_I323|\t_|{_|(_C79C543|,_L970916083725|,_Crichard keith frazine|)_|,_|(_C79C543|,_L970916095254|,_Cmicrosoft works|)_|}_"
+        expected_output = [ [ ("79C543", 970916083725, "richard keith frazine"),
+                              ("79C543", 970916095254, "microsoft works") ],
+                            323,
+                            [ ("79C543", 970916083725, "richard keith frazine"),
+                              ("79C543", 970916095254, "microsoft works") ] ]
+        out = controller.deserialize_input(input)
+        self.assertEquals(expected_output, out)
+
+    def test__short_tuple(self):
+        input = "|(_I1|)_"
+        expected_output = [ (1,) ]
+        out = controller.deserialize_input(input)
+        self.assertEquals(expected_output, out)
+
+    def test__nested_tuple(self):
+        input = "|(_|(_I123|,_Cabc|)_|,_|(_Cdef|,_I456|)_|)_"
+        expected_output = [ ( (123, "abc"), ("def", 456) ) ]
+        out = controller.deserialize_input(input)
+        self.assertEquals(expected_output, out)
+
+    def test__map(self):
+        input = "|[_Cname#CJohn|,_Cext#I5555|]_"
+        expected_output = [ {"name":u"John",
+                             "ext":5555} ]
+        out = controller.deserialize_input(input)
+        self.assertEquals(expected_output, out)
+
+    def test__short_field_map(self):
+        input = "|[_Cn#CJohn|,_Ce#C5555|]_"
+        expected_output = [ {"n":u"John",
+                             "e":u"5555"} ]
+        out = controller.deserialize_input(input)
+        self.assertEquals(expected_output, out)
+
+    def test__complex_map(self):
+        input = "|[_CD#|[_CA#I1|,_CB#CE|]_|,_CC#CF|]_"
+        expected_output = [ { u"D": { u"A": 1,
+                                     u"B": u"E"
+                                   },
+                              u"C": u"F"
+                            } ]
+        out = controller.deserialize_input(input)
+        self.assertEquals(expected_output, out)
+
+    def test__all_types(self):
+        input = "A123|\t_Btrue|\t_Cabc|\t_D4.0|\t_F5.0|\t_I32|\t_L45"
+        expected_output = [ b"123", True, u"abc", 4.0, 5.0, 32, 45L ]
+        out = controller.deserialize_input(input)
+        self.assertEquals(expected_output, out)
+
+    def test__bad_types(self):
+        input = "K123"
+        self.assertRaises(Exception, controller.deserialize_input, [input])
+
+    def test__error(self):
+        self.maxDiff = None
+        input = "|[_Cspec#|{_|(_|[_Croles#|{_|(_Chadoop-namenode|)_|,_|(_Chadoop-jobtracker|)_|,_|(_Cpig-master|)_|}_|,_Cnum_instances#I1|]_|)_|,_|(_|[_Croles#|{_|(_Chadoop-datanode|)_|,_|(_Chadoop-tasktracker|)_|}_|,_Cnum_instances#I1|]_|)_|}_|,_Chardware#Cm1.large|,_Caccount_id#I1234567|,_Clocation#Cus-east-1b|,_Cstatus#Cdestroyed|,_Cimage#Cus-east-1/ami-1234|,_Cjclouds_name#Cmhcdevelopment_1234|,_Cinstances#|{_|(_|[_Cprivate_address#C10.10.10.10|,_Croles#|{_|(_Chadoop-datanode|)_|,_|(_Chadoop-tasktracker|)_|}_|,_Cpublic_address#Cec2-10-10-10-10.compute-1.amazonaws.com|,_Cinstance_id#Cus-east-1/i-1234|]_|)_|,_|(_|[_Cprivate_address#C10.10.10.10|,_Croles#|{_|(_Chadoop-namenode|)_|,_|(_Chadoop-jobtracker|)_|,_|(_Cpig-master|)_|}_|,_Cpublic_address#Cec2-10-10-10-10.compute-1.amazonaws.com|,_Cinstance_id#Cus-east-1/i-4321|]_|)_|}_|,_Cstop_timestamp#|-_|,_Cplan_code#Cstandard|,_C_id#I1234567890|,_Crunning_timestamp#|-_|,_Cuser_id#I1234|,_Cstart_timestamp#CTue Oct 25 19:26:18 UTC 2011|]
 _"
+        expected_output = [ {u'status': u'destroyed',
+                             u'start_timestamp': u'Tue Oct 25 19:26:18 UTC 2011',
+                             u'user_id': 1234,
+                             u'account_id': 1234567,
+                             u'running_timestamp': None,
+                             u'image': u'us-east-1/ami-1234',
+                             u'hardware': u'm1.large',
+                             u'instances': [ ( { u'private_address':u'10.10.10.10',
+                                                 u'roles' : [ ( u'hadoop-datanode', ),
+                                                              ( u'hadoop-tasktracker',) ],
+                                                 u'public_address':u'ec2-10-10-10-10.compute-1.amazonaws.com',
+                                                 u'instance_id':u'us-east-1/i-1234'
+                                               },
+                                             ),
+                                             ( {
+                                                 u'private_address':u'10.10.10.10',
+                                                 u'roles' : [ ( u'hadoop-namenode', ),
+                                                              ( u'hadoop-jobtracker', ),
+                                                              ( u'pig-master', ) ],
+                                                 u'public_address' : u'ec2-10-10-10-10.compute-1.amazonaws.com',
+                                                 u'instance_id' : u'us-east-1/i-4321'
+                                               },
+                                             )
+                                           ],
+                             u'plan_code': u'standard',
+                             u'location': u'us-east-1b',
+                             u'_id': 1234567890,
+                             u'spec': [ ( { u'roles' : [ (u'hadoop-namenode', ),
+                                                         (u'hadoop-jobtracker', ),
+                                                         (u'pig-master', ) ],
+                                            u'num_instances' : 1,
+                                          },
+                                        ),
+                                        ( { u'roles' : [ (u'hadoop-datanode',),
+                                                         (u'hadoop-tasktracker',) ],
+                                            u'num_instances' : 1,
+                                          },
+                                        )
+                                      ],
+                             u'jclouds_name': u'mhcdevelopment_1234',
+                             u'stop_timestamp': None}]
+        out = controller.deserialize_input(input)
+        self.assertEquals(expected_output, out)
+
+    def test__empty_string(self):
+        input = "C"
+        expected_output = [ "" ]
+        out = controller.deserialize_input(input)
+        self.assertEquals(expected_output, out)
+
+    def test__string_with_hash(self):
+        input = "Cabc#!,|g|,(){}"
+        expected_output = [ "abc#!,|g|,(){}" ]
+        out = controller.deserialize_input(input)
+        self.assertEquals(expected_output, out)
+
+class TestSerializeOutput( unittest.TestCase ):
+    def test__chararray(self):
+        input = "1234"
+        expected_output = "1234"
+        out = controller.serialize_output(input)
+        self.assertEquals(expected_output, out)
+
+    def test__empty_chararray(self):
+        input = ""
+        expected_output = ""
+        out = controller.serialize_output(input)
+        self.assertEquals(expected_output, out)
+
+    def test__null_chararray(self):
+        input = None
+        expected_output = "|-_"
+        out = controller.serialize_output(input)
+        self.assertEquals(expected_output, out)
+
+    def test__num(self):
+        input = 1234
+        expected_output = "1234"
+        out = controller.serialize_output(input)
+        self.assertEquals(expected_output, out)
+
+    def test__bool_true(self):
+        input = True
+        expected_output = "true"
+        out = controller.serialize_output(input)
+        self.assertEquals(expected_output, out)
+
+    def test__bool_false(self):
+        input = False
+        expected_output = "false"
+        out = controller.serialize_output(input)
+        self.assertEquals(expected_output, out)
+
+    def test__datetime(self):
+        d = datetime.now()
+        input = (d,)
+        expected_output = "|(_%s|)_" % d.isoformat()
+        out = controller.serialize_output(input)
+        self.assertEquals(expected_output, out)
+
+    def test__tuple(self):
+        input = (1234, "abc")
+        expected_output = "|(_1234|,_abc|)_"
+        out = controller.serialize_output(input)
+        self.assertEquals(expected_output, out)
+
+    def test__short_tuple(self):
+        input = (1,)
+        expected_output = "|(_1|)_"
+        out = controller.serialize_output(input)
+        self.assertEquals(expected_output, out)
+
+    def test__bag(self):
+        input = [1234, "abc"]
+        expected_output = "|{_1234|,_abc|}_"
+        out = controller.serialize_output(input)
+        self.assertEquals(expected_output, out)
+
+    def test__nested_tuple(self):
+        input = [(32,12,'abc'), 32, ['abc', 'def', 'ghi']]
+        expected_output = "|{_|(_32|,_12|,_abc|)_|,_32|,_|{_abc|,_def|,_ghi|}_|}_"
+        out = controller.serialize_output(input)
+        self.assertEquals(expected_output, out)
+
+    def test__map(self):
+        input = {'a': 1, 'b':'z'}
+        expected_output = "|[_a#1|,_b#z|]_"
+        out = controller.serialize_output(input)
+        self.assertEquals(expected_output, out)
+
+    def test__bug(self):
+        input = (None, 32, 98765432109876543210L)
+        expected_output = "|(_|-_|,_32|,_98765432109876543210|)_"
+        out = controller.serialize_output(input)
+        self.assertEquals(expected_output, out)
+
+class TestReadInput( unittest.TestCase ):
+    def test__multiline_record(self):
+        cont = controller.PythonStreamingController()
+        inputio = StringIO.StringIO()
+        inputio.write('12\n')
+        inputio.write('34\n')
+        inputio.write('5|_\n')
+        inputio.seek(0)
+
+        cont.input_stream = inputio
+        cont.output_stream = sys.stdout
+        out = cont.get_next_input()
+
+        self.assertEquals('12\n34\n5', out)
+
+    def test__complexmultiline_record(self):
+        cont = controller.PythonStreamingController()
+        inputio = StringIO.StringIO()
+        inputio.write('|{_|(_32|,_12|,_a\n')
+        inputio.write('bc|)_|,_32|,_|{_ab\n')
+        inputio.write('c|,_def|,_gh\n')
+        inputio.write('i|}_|}_|_\n')
+        inputio.seek(0)
+
+        cont.input_stream = inputio
+        cont.output_stream = sys.stdout
+        out = cont.get_next_input()
+
+        self.assertEquals('|{_|(_32|,_12|,_a\nbc|)_|,_32|,_|{_ab\nc|,_def|,_gh\ni|}_|}_', out)

Modified: pig/trunk/test/unit-tests
URL: http://svn.apache.org/viewvc/pig/trunk/test/unit-tests?rev=1525632&r1=1525631&r2=1525632&view=diff
==============================================================================
--- pig/trunk/test/unit-tests (original)
+++ pig/trunk/test/unit-tests Mon Sep 23 15:56:16 2013
@@ -47,10 +47,12 @@
 **/TestPackage.java
 **/TestParamSubPreproc.java
 **/TestPigBytesRawComparator.java
+**/TestPigStreamingUDF.java
 **/TestPhyOp.java
 **/TestPigScriptParser.java
 **/TestPigSplit.java
 **/TestPigStats.java
+**/TestPigStreaming.java
 **/TestPinOptions.java
 **/TestPOBinCond.java
 **/TestPOCast.java
@@ -68,6 +70,7 @@
 **/TestResourceSchema.java
 **/TestSchemaUtil.java
 **/TestStreamingLocal.java
+**/TestStreamingUDFOutputHandler.java
 **/TestSubtract.java
 **/TestTextDataParser.java
 **/TestTupleFormat.java