You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/03/05 08:18:26 UTC
svn commit: r919320 -
/hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/TestPigStorageSchema.java
Author: daijy
Date: Fri Mar 5 07:18:25 2010
New Revision: 919320
URL: http://svn.apache.org/viewvc?rev=919320&view=rev
Log:
PIG-1261: PigStorageSchema broke after changes to ResourceSchema
Added:
hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/TestPigStorageSchema.java
Added: hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/TestPigStorageSchema.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/TestPigStorageSchema.java?rev=919320&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/TestPigStorageSchema.java (added)
+++ hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/TestPigStorageSchema.java Fri Mar 5 07:18:25 2010
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.pig.piggybank.test;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.test.MiniCluster;
+import org.apache.pig.test.Util;
+import org.apache.pig.test.utils.TypeCheckingTestUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+public class TestPigStorageSchema extends TestCase {
+
+ protected ExecType execType = ExecType.MAPREDUCE;
+
+ PigContext pigContext = new PigContext(ExecType.MAPREDUCE, new Properties());
+ Map<LogicalOperator, LogicalPlan> aliases = new HashMap<LogicalOperator, LogicalPlan>();
+ Map<OperatorKey, LogicalOperator> logicalOpTable = new HashMap<OperatorKey, LogicalOperator>();
+ Map<String, LogicalOperator> aliasOp = new HashMap<String, LogicalOperator>();
+ Map<String, String> fileNameMap = new HashMap<String, String>();
+
+ MiniCluster cluster = MiniCluster.buildCluster();
+
+ private PigServer pig;
+
+ @Before
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ String origPath = FileLocalizer.fullPath("originput", pig.getPigContext());
+ if (FileLocalizer.fileExists(origPath, pig.getPigContext())) {
+ FileLocalizer.delete(origPath, pig.getPigContext());
+ }
+ Util.createInputFile(cluster, "originput",
+ new String[] {"A,1", "B,2", "C,3", "D,2",
+ "A,5", "B,5", "C,8", "A,8",
+ "D,8", "A,9"});
+
+ }
+
+ @After
+ @Override
+ protected void tearDown() throws Exception {
+ Util.deleteFile(cluster, "originput");
+ String aoutPath = FileLocalizer.fullPath("aout", pig.getPigContext());
+ if (FileLocalizer.fileExists(aoutPath, pig.getPigContext())) {
+ FileLocalizer.delete(aoutPath, pig.getPigContext());
+ }
+ }
+
+ @Test
+ public void testPigStorageSchema() throws Exception {
+ pigContext.connect();
+ String query = "a = LOAD 'originput' using org.apache.pig.piggybank.storage.PigStorageSchema() as (f1:chararray, f2:int);";
+ pig.registerQuery(query);
+ Schema origSchema = pig.dumpSchema("a");
+ pig.registerQuery("STORE a into 'aout' using org.apache.pig.piggybank.storage.PigStorageSchema();");
+
+ // aout now has a schema.
+
+ // Verify that loading a-out with no given schema produces
+ // the original schema.
+
+ pig.registerQuery("b = LOAD 'aout' using org.apache.pig.piggybank.storage.PigStorageSchema();");
+ Schema genSchema = pig.dumpSchema("b");
+ assertTrue("generated schema equals original" , Schema.equals(genSchema, origSchema, true, false));
+
+ // Verify that giving our own schema works
+ String [] aliases ={"foo", "bar"};
+ byte[] types = {DataType.INTEGER, DataType.LONG};
+ Schema newSchema = TypeCheckingTestUtil.genFlatSchema(
+ aliases,types);
+ pig.registerQuery("c = LOAD 'aout' using org.apache.pig.piggybank.storage.PigStorageSchema() as (foo:int, bar:long);");
+ Schema newGenSchema = pig.dumpSchema("c");
+ assertTrue("explicit schema overrides metadata", Schema.equals(newSchema, newGenSchema, true, false));
+
+ }
+
+ @Test
+ public void testSchemaConversion() throws Exception {
+
+ Util.createInputFile(cluster, "originput2",
+ new String[] {"1", "2", "3", "2",
+ "5", "5", "8", "8",
+ "8", "9"});
+
+ pig.registerQuery("A = LOAD 'originput2' using org.apache.pig.piggybank.storage.PigStorageSchema() as (f:int);");
+ pig.registerQuery("B = group A by f;");
+ Schema origSchema = pig.dumpSchema("B");
+ ResourceSchema rs1 = new ResourceSchema(origSchema);
+ pig.registerQuery("STORE B into 'bout' using org.apache.pig.piggybank.storage.PigStorageSchema();");
+
+ pig.registerQuery("C = LOAD 'bout' using org.apache.pig.piggybank.storage.PigStorageSchema();");
+ Schema genSchema = pig.dumpSchema("C");
+ ResourceSchema rs2 = new ResourceSchema(genSchema);
+ assertTrue("generated schema equals original" , ResourceSchema.equals(rs1, rs2));
+
+ pig.registerQuery("C1 = LOAD 'bout' as (a0:int, A: {t: (f:int) } );");
+ pig.registerQuery("D = foreach C1 generate a0, SUM(A);");
+
+ List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+ new String[] {
+ "(1,1L)",
+ "(2,4L)",
+ "(3,3L)",
+ "(5,10L)",
+ "(8,24L)",
+ "(9,9L)"
+ });
+
+ Iterator<Tuple> iter = pig.openIterator("D");
+ int counter = 0;
+ while (iter.hasNext()) {
+ assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
+ }
+
+ assertEquals(expectedResults.size(), counter);
+ }
+
+ @Test
+ public void testSchemaConversion2() throws Exception {
+
+ pig.registerQuery("A = LOAD 'originput' using org.apache.pig.piggybank.storage.PigStorageSchema(',') as (f1:chararray, f2:int);");
+ pig.registerQuery("B = group A by f1;");
+ Schema origSchema = pig.dumpSchema("B");
+ ResourceSchema rs1 = new ResourceSchema(origSchema);
+ pig.registerQuery("STORE B into 'cout' using org.apache.pig.piggybank.storage.PigStorageSchema();");
+
+ pig.registerQuery("C = LOAD 'cout' using org.apache.pig.piggybank.storage.PigStorageSchema();");
+ Schema genSchema = pig.dumpSchema("C");
+ ResourceSchema rs2 = new ResourceSchema(genSchema);
+ assertTrue("generated schema equals original" , ResourceSchema.equals(rs1, rs2));
+
+ pig.registerQuery("C1 = LOAD 'cout' as (a0:chararray, A: {t: (f1:chararray, f2:int) } );");
+ pig.registerQuery("D = foreach C1 generate a0, SUM(A.f2);");
+
+ List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+ new String[] {
+ "('A',23L)",
+ "('B',7L)",
+ "('C',11L)",
+ "('D',10L)"
+ });
+
+ Iterator<Tuple> iter = pig.openIterator("D");
+ int counter = 0;
+ while (iter.hasNext()) {
+ assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
+ }
+
+ assertEquals(expectedResults.size(), counter);
+ }
+
+}