You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/03/05 08:18:26 UTC

svn commit: r919320 - /hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/TestPigStorageSchema.java

Author: daijy
Date: Fri Mar  5 07:18:25 2010
New Revision: 919320

URL: http://svn.apache.org/viewvc?rev=919320&view=rev
Log:
PIG-1261: PigStorageSchema broke after changes to ResourceSchema

Added:
    hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/TestPigStorageSchema.java

Added: hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/TestPigStorageSchema.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/TestPigStorageSchema.java?rev=919320&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/TestPigStorageSchema.java (added)
+++ hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/TestPigStorageSchema.java Fri Mar  5 07:18:25 2010
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.pig.piggybank.test;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.test.MiniCluster;
+import org.apache.pig.test.Util;
+import org.apache.pig.test.utils.TypeCheckingTestUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+public class TestPigStorageSchema extends TestCase {
+
+    protected ExecType execType = ExecType.MAPREDUCE;
+
+    PigContext pigContext = new PigContext(ExecType.MAPREDUCE, new Properties());
+    Map<LogicalOperator, LogicalPlan> aliases = new HashMap<LogicalOperator, LogicalPlan>();
+    Map<OperatorKey, LogicalOperator> logicalOpTable = new HashMap<OperatorKey, LogicalOperator>();
+    Map<String, LogicalOperator> aliasOp = new HashMap<String, LogicalOperator>();
+    Map<String, String> fileNameMap = new HashMap<String, String>();
+
+    MiniCluster cluster = MiniCluster.buildCluster();
+
+    private PigServer pig;
+
+    @Before
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        String origPath = FileLocalizer.fullPath("originput", pig.getPigContext()); 
+        if (FileLocalizer.fileExists(origPath, pig.getPigContext())) {
+            FileLocalizer.delete(origPath, pig.getPigContext());
+        }
+        Util.createInputFile(cluster, "originput", 
+                new String[] {"A,1", "B,2", "C,3", "D,2",
+                              "A,5", "B,5", "C,8", "A,8",
+                              "D,8", "A,9"});
+
+    }
+    
+    @After
+    @Override
+    protected void tearDown() throws Exception {
+        Util.deleteFile(cluster, "originput");
+        String aoutPath = FileLocalizer.fullPath("aout", pig.getPigContext()); 
+        if (FileLocalizer.fileExists(aoutPath, pig.getPigContext())) {
+            FileLocalizer.delete(aoutPath, pig.getPigContext());
+        }
+    }
+    
+    @Test
+    public void testPigStorageSchema() throws Exception {
+        pigContext.connect();
+        String query = "a = LOAD 'originput' using org.apache.pig.piggybank.storage.PigStorageSchema() as (f1:chararray, f2:int);";
+        pig.registerQuery(query);
+        Schema origSchema = pig.dumpSchema("a");
+        pig.registerQuery("STORE a into 'aout' using org.apache.pig.piggybank.storage.PigStorageSchema();");
+        
+        // aout now has a schema. 
+
+        // Verify that loading a-out with no given schema produces 
+        // the original schema.
+        
+        pig.registerQuery("b = LOAD 'aout' using org.apache.pig.piggybank.storage.PigStorageSchema();");
+        Schema genSchema = pig.dumpSchema("b");
+        assertTrue("generated schema equals original" , Schema.equals(genSchema, origSchema, true, false));
+        
+        // Verify that giving our own schema works
+        String [] aliases ={"foo", "bar"};
+        byte[] types = {DataType.INTEGER, DataType.LONG};
+        Schema newSchema = TypeCheckingTestUtil.genFlatSchema(
+                aliases,types);
+        pig.registerQuery("c = LOAD 'aout' using org.apache.pig.piggybank.storage.PigStorageSchema() as (foo:int, bar:long);");
+        Schema newGenSchema = pig.dumpSchema("c");
+        assertTrue("explicit schema overrides metadata", Schema.equals(newSchema, newGenSchema, true, false));
+        
+    }
+    
+    @Test
+    public void testSchemaConversion() throws Exception {   
+ 
+        Util.createInputFile(cluster, "originput2", 
+                new String[] {"1", "2", "3", "2",
+                              "5", "5", "8", "8",
+                              "8", "9"});
+        
+        pig.registerQuery("A = LOAD 'originput2' using org.apache.pig.piggybank.storage.PigStorageSchema() as (f:int);");
+        pig.registerQuery("B = group A by f;");
+        Schema origSchema = pig.dumpSchema("B");
+        ResourceSchema rs1 = new ResourceSchema(origSchema);
+        pig.registerQuery("STORE B into 'bout' using org.apache.pig.piggybank.storage.PigStorageSchema();");
+        
+        pig.registerQuery("C = LOAD 'bout' using org.apache.pig.piggybank.storage.PigStorageSchema();");
+        Schema genSchema = pig.dumpSchema("C");
+        ResourceSchema rs2 = new ResourceSchema(genSchema);
+        assertTrue("generated schema equals original" , ResourceSchema.equals(rs1, rs2));
+        
+        pig.registerQuery("C1 = LOAD 'bout' as (a0:int, A: {t: (f:int) } );");
+        pig.registerQuery("D = foreach C1 generate a0, SUM(A);");
+
+        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+                new String[] { 
+                        "(1,1L)",
+                        "(2,4L)",
+                        "(3,3L)",
+                        "(5,10L)",
+                        "(8,24L)",
+                        "(9,9L)"
+                });
+        
+        Iterator<Tuple> iter = pig.openIterator("D");
+        int counter = 0;
+        while (iter.hasNext()) {
+            assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());      
+        }
+        
+        assertEquals(expectedResults.size(), counter);
+    }
+    
+    @Test
+    public void testSchemaConversion2() throws Exception {   
+ 
+        pig.registerQuery("A = LOAD 'originput' using org.apache.pig.piggybank.storage.PigStorageSchema(',') as (f1:chararray, f2:int);");
+        pig.registerQuery("B = group A by f1;");
+        Schema origSchema = pig.dumpSchema("B");
+        ResourceSchema rs1 = new ResourceSchema(origSchema);
+        pig.registerQuery("STORE B into 'cout' using org.apache.pig.piggybank.storage.PigStorageSchema();");
+        
+        pig.registerQuery("C = LOAD 'cout' using org.apache.pig.piggybank.storage.PigStorageSchema();");
+        Schema genSchema = pig.dumpSchema("C");
+        ResourceSchema rs2 = new ResourceSchema(genSchema);
+        assertTrue("generated schema equals original" , ResourceSchema.equals(rs1, rs2));
+        
+        pig.registerQuery("C1 = LOAD 'cout' as (a0:chararray, A: {t: (f1:chararray, f2:int) } );");
+        pig.registerQuery("D = foreach C1 generate a0, SUM(A.f2);");
+
+        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+                new String[] { 
+                        "('A',23L)",
+                        "('B',7L)",
+                        "('C',11L)",
+                        "('D',10L)"
+                });
+        
+        Iterator<Tuple> iter = pig.openIterator("D");
+        int counter = 0;
+        while (iter.hasNext()) {
+            assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());      
+        }
+        
+        assertEquals(expectedResults.size(), counter);
+    }
+ 
+}