You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2009/01/08 02:18:29 UTC
svn commit: r732581 [2/2] - in /hadoop/pig/branches/types: ./
src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/
src/org/apache/pig/backend/hadoop/execu...
Added: hadoop/pig/branches/types/test/org/apache/pig/test/TestFRJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/TestFRJoin.java?rev=732581&view=auto
==============================================================================
--- hadoop/pig/branches/types/test/org/apache/pig/test/TestFRJoin.java (added)
+++ hadoop/pig/branches/types/test/org/apache/pig/test/TestFRJoin.java Wed Jan 7 17:18:29 2009
@@ -0,0 +1,433 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Hashtable;
+import java.util.Iterator;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.ExecType;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.test.utils.TestHelper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestFRJoin extends TestCase{
+ private static final String INPUT_FILE = "testFrJoinInput.txt";
+ private PigServer pigServer;
+ private MiniCluster cluster = MiniCluster.buildCluster();
+ private File tmpFile;
+
+ public TestFRJoin() throws ExecException, IOException{
+ pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+// pigServer = new PigServer(ExecType.LOCAL);
+
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ int LOOP_SIZE = 2;
+ String[] input = new String[2*LOOP_SIZE];
+ int k = 0;
+ for(int i = 1; i <= LOOP_SIZE; i++) {
+ String si = i + "";
+ for(int j=1;j<=LOOP_SIZE;j++)
+ input[k++] = si + "\t" + j;
+ }
+ Util.createInputFile(cluster, INPUT_FILE, input);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ Util.deleteFile(cluster, INPUT_FILE);
+ }
+
+ public static class FRJoin extends EvalFunc<DataBag>{
+ String repl;
+ int keyField;
+ boolean isTblSetUp = false;
+ Hashtable<String, DataBag> replTbl = new Hashtable<String, DataBag>();
+
+ public FRJoin(){
+
+ }
+ public FRJoin(String repl){
+ this.repl = repl;
+ }
+
+ @Override
+ public DataBag exec(Tuple input) throws IOException {
+ if(!isTblSetUp){
+ setUpHashTable();
+ isTblSetUp = true;
+ }
+ try {
+ String key = (String) input.get(keyField);
+ if(!replTbl.containsKey(key)) return BagFactory.getInstance().newDefaultBag();
+ return replTbl.get(key);
+ } catch (ExecException e) {
+ throw new IOException(e.getMessage());
+ }
+
+ }
+
+ private void setUpHashTable() throws IOException {
+ FileSpec replFile = new FileSpec(repl,new FuncSpec(PigStorage.class.getName()+"()"));
+ POLoad ld = new POLoad(new OperatorKey("Repl File Loader", 1L), replFile, false);
+ PigContext pc = new PigContext(ExecType.MAPREDUCE,ConfigurationUtil.toProperties(PigMapReduce.sJobConf));
+ try {
+ pc.connect();
+
+ ld.setPc(pc);
+ Tuple dummyTuple = null;
+ for(Result res=ld.getNext(dummyTuple);res.returnStatus!=POStatus.STATUS_EOP;res=ld.getNext(dummyTuple)){
+ Tuple tup = (Tuple) res.result;
+ LoadFunc lf = ((LoadFunc)pc.instantiateFuncFromSpec(ld.getLFile().getFuncSpec()));
+ String key = lf.bytesToCharArray(((DataByteArray)tup.get(keyField)).get());
+ Tuple csttup = TupleFactory.getInstance().newTuple(2);
+ csttup.set(0, key);
+ csttup.set(1, lf.bytesToInteger(((DataByteArray)tup.get(1)).get()));
+ DataBag vals = null;
+ if(replTbl.containsKey(key)){
+ vals = replTbl.get(key);
+ }
+ else{
+ vals = BagFactory.getInstance().newDefaultBag();
+ replTbl.put(key, vals);
+ }
+ vals.add(csttup);
+ }
+ } catch (ExecException e) {
+ throw new IOException(e.getMessage());
+ }
+ }
+
+ }
+
+ @Test
+ public void testUDFFRJ() throws IOException {
+ pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (x:chararray,y:int);");
+ pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' as (x:chararray,y:int);");
+
+ DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
+ {
+ String fSpec = FRJoin.class.getName()+ "('" + INPUT_FILE + "')";
+ pigServer.registerFunction("FRJ", new FuncSpec(fSpec));
+ pigServer.registerQuery("C = foreach A generate *, flatten(FRJ(*));");
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ dbfrj.add(iter.next());
+ }
+ }
+ {
+ pigServer.registerQuery("C = join A by $0, B by $0;");
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ dbshj.add(iter.next());
+ }
+ }
+ Assert.assertTrue(dbfrj.size()>0 && dbshj.size()>0);
+ Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
+ }
+
+ @Test
+ public void testFRJoinOut1() throws IOException{
+ pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (x:int,y:int);");
+ pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' as (x:int,y:int);");
+ DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
+ {
+ pigServer.registerQuery("C = join A by $0, B by $0 using \"replicated\";");
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ dbfrj.add(iter.next());
+ }
+ }
+ {
+ pigServer.registerQuery("C = join A by $0, B by $0;");
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ dbshj.add(iter.next());
+ }
+ }
+ Assert.assertTrue(dbfrj.size()>0 && dbshj.size()>0);
+ Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
+ }
+
+ @Test
+ public void testFRJoinOut2() throws IOException{
+ pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "';");
+ pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "';");
+ DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
+ {
+ pigServer.registerQuery("C = join A by $0, B by $0 using \"replicated\";");
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ dbfrj.add(iter.next());
+ }
+ }
+ {
+ pigServer.registerQuery("C = join A by $0, B by $0;");
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ dbshj.add(iter.next());
+ }
+ }
+ Assert.assertTrue(dbfrj.size()>0 && dbshj.size()>0);
+ Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
+ }
+
+ @Test
+ public void testFRJoinOut3() throws IOException{
+ pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (x:int,y:int);");
+ pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' as (x:int,y:int);");
+ pigServer.registerQuery("C = LOAD '" + INPUT_FILE + "' as (x:int,y:int);");
+ DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
+ {
+ pigServer.registerQuery("D = join A by $0, B by $0, C by $0 using \"replicated\";");
+ Iterator<Tuple> iter = pigServer.openIterator("D");
+
+ while(iter.hasNext()) {
+ dbfrj.add(iter.next());
+ }
+ }
+ {
+ pigServer.registerQuery("D = join A by $0, B by $0, C by $0;");
+ Iterator<Tuple> iter = pigServer.openIterator("D");
+
+ while(iter.hasNext()) {
+ dbshj.add(iter.next());
+ }
+ }
+ Assert.assertTrue(dbfrj.size()>0 && dbshj.size()>0);
+ Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
+ }
+
+ @Test
+ public void testFRJoinOut4() throws IOException{
+ pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "';");
+ pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "';");
+ pigServer.registerQuery("C = LOAD '" + INPUT_FILE + "';");
+ DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
+ {
+ pigServer.registerQuery("D = join A by $0, B by $0, C by $0 using \"replicated\";");
+ Iterator<Tuple> iter = pigServer.openIterator("D");
+
+ while(iter.hasNext()) {
+ dbfrj.add(iter.next());
+ }
+ }
+ {
+ pigServer.registerQuery("D = join A by $0, B by $0, C by $0;");
+ Iterator<Tuple> iter = pigServer.openIterator("D");
+
+ while(iter.hasNext()) {
+ dbshj.add(iter.next());
+ }
+ }
+ Assert.assertTrue(dbfrj.size()>0 && dbshj.size()>0);
+ Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
+ }
+
+
+
+ @Test
+ public void testFRJoinOut5() throws IOException{
+ pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (x:int,y:int);");
+ pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' as (x:int,y:int);");
+ DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
+ {
+ pigServer.registerQuery("C = join A by ($0,$1), B by ($0,$1) using \"replicated\";");
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ dbfrj.add(iter.next());
+ }
+ }
+ {
+ pigServer.registerQuery("C = join A by ($0,$1), B by ($0,$1);");
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ dbshj.add(iter.next());
+ }
+ }
+ Assert.assertTrue(dbfrj.size()>0 && dbshj.size()>0);
+ Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
+ }
+
+ @Test
+ public void testFRJoinOut6() throws IOException{
+ pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "';");
+ pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "';");
+ DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
+ {
+ pigServer.registerQuery("C = join A by ($0,$1), B by ($0,$1) using \"replicated\";");
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ dbfrj.add(iter.next());
+ }
+ }
+ {
+ pigServer.registerQuery("C = join A by ($0,$1), B by ($0,$1);");
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ dbshj.add(iter.next());
+ }
+ }
+ Assert.assertTrue(dbfrj.size()>0 && dbshj.size()>0);
+ Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
+ }
+
+ @Test
+ public void testFRJoinOut7() throws IOException{
+ pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (x:int,y:int);");
+ pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' as (x:int,y:int);");
+ DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
+ {
+ pigServer.registerQuery("C = join A by $0, B by $0 using \"replicated\";");
+ pigServer.registerQuery("D = join A by $1, B by $1 using \"replicated\";");
+ pigServer.registerQuery("E = union C,D;");
+ Iterator<Tuple> iter = pigServer.openIterator("E");
+
+ while(iter.hasNext()) {
+ dbfrj.add(iter.next());
+ }
+ }
+ {
+ pigServer.registerQuery("C = join A by $0, B by $0;");
+ pigServer.registerQuery("D = join A by $1, B by $1;");
+ pigServer.registerQuery("E = union C,D;");
+ Iterator<Tuple> iter = pigServer.openIterator("E");
+
+ while(iter.hasNext()) {
+ dbshj.add(iter.next());
+ }
+ }
+ Assert.assertTrue(dbfrj.size()>0 && dbshj.size()>0);
+ Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
+ }
+
+ @Test
+ public void testFRJoinSch1() throws IOException{
+ pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (x:int,y:int);");
+ pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' as (x:int,y:int);");
+ Schema frjSch = null, shjSch = null;
+ pigServer.registerQuery("C = join A by $0, B by $0 using \"repl\";");
+ frjSch = pigServer.dumpSchema("C");
+ pigServer.registerQuery("C = join A by $0, B by $0;");
+ shjSch = pigServer.dumpSchema("C");
+ Assert.assertEquals(true, shjSch.equals(frjSch));
+ }
+
+ @Test
+ public void testFRJoinSch2() throws IOException{
+ pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "';");
+ pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "';");
+ Schema frjSch = null, shjSch = null;
+ pigServer.registerQuery("C = join A by $0, B by $0 using \"repl\";");
+ frjSch = pigServer.dumpSchema("C");
+ pigServer.registerQuery("C = join A by $0, B by $0;");
+ shjSch = pigServer.dumpSchema("C");
+ Assert.assertEquals(true, shjSch.equals(frjSch));
+ }
+
+ @Test
+ public void testFRJoinSch3() throws IOException{
+ pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (x:int,y:int);");
+ pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' as (x:int,y:int);");
+ pigServer.registerQuery("C = LOAD '" + INPUT_FILE + "' as (x:int,y:int);");
+ Schema frjSch = null, shjSch = null;
+ pigServer.registerQuery("D = join A by $0, B by $0, C by $0 using \"repl\";");
+ frjSch = pigServer.dumpSchema("D");
+ pigServer.registerQuery("D = join A by $0, B by $0, C by $0;");
+ shjSch = pigServer.dumpSchema("D");
+ Assert.assertEquals(true, shjSch.equals(frjSch));
+ }
+
+ @Test
+ public void testFRJoinSch4() throws IOException{
+ pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "';");
+ pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "';");
+ pigServer.registerQuery("C = LOAD '" + INPUT_FILE + "';");
+ Schema frjSch = null, shjSch = null;
+ pigServer.registerQuery("D = join A by $0, B by $0, C by $0 using \"repl\";");
+ frjSch = pigServer.dumpSchema("D");
+ pigServer.registerQuery("D = join A by $0, B by $0, C by $0;");
+ shjSch = pigServer.dumpSchema("D");
+ Assert.assertEquals(true, shjSch.equals(frjSch));
+ }
+
+ @Test
+ public void testFRJoinSch5() throws IOException{
+ pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (x:int,y:int);");
+ pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' as (x:int,y:int);");
+ Schema frjSch = null, shjSch = null;
+ pigServer.registerQuery("C = join A by ($0,$1), B by ($0,$1) using \"repl\";");
+ frjSch = pigServer.dumpSchema("C");
+ pigServer.registerQuery("C = join A by ($0,$1), B by ($0,$1);");
+ shjSch = pigServer.dumpSchema("C");
+ Assert.assertEquals(true, shjSch.equals(frjSch));
+ }
+
+ @Test
+ public void testFRJoinSch6() throws IOException{
+ pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "';");
+ pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "';");
+ Schema frjSch = null, shjSch = null;
+ pigServer.registerQuery("C = join A by ($0,$1), B by ($0,$1) using \"repl\";");
+ frjSch = pigServer.dumpSchema("C");
+ pigServer.registerQuery("C = join A by ($0,$1), B by ($0,$1);");
+ shjSch = pigServer.dumpSchema("C");
+ Assert.assertEquals(true, shjSch.equals(frjSch));
+ }
+}