You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2009/10/30 21:23:56 UTC
svn commit: r831443 [4/4] - in /hadoop/pig/branches/load-store-redesign: ./
lib-src/bzip2/org/apache/tools/bzip2r/ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ ...
Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMergeJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMergeJoin.java?rev=831443&r1=831442&r2=831443&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMergeJoin.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMergeJoin.java Fri Oct 30 20:23:54 2009
@@ -19,18 +19,23 @@
import java.io.IOException;
import java.util.Iterator;
+import java.util.Map;
import junit.framework.Assert;
+import org.apache.hadoop.conf.Configuration;
import org.apache.pig.ExecType;
+import org.apache.pig.IndexableLoadFunc;
import org.apache.pig.PigException;
import org.apache.pig.PigServer;
+import org.apache.pig.backend.datastorage.DataStorage;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.util.LogUtils;
@@ -49,6 +54,7 @@
public TestMergeJoin() throws ExecException, IOException{
pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+// pigServer = new PigServer(ExecType.LOCAL);
}
/**
* @throws java.lang.Exception
@@ -456,6 +462,58 @@
}
@Test
+ public void testExpression() throws IOException{
+ Util.createInputFile(cluster, "temp_file1", new String[]{"8", "9"});
+ Util.createInputFile(cluster, "temp_file2", new String[]{"10", "11"});
+ Util.createInputFile(cluster, "temp_file3", new String[]{"20"});
+ Util.createInputFile(cluster, "leftinput", new String[] { "9", "11"});
+ pigServer.registerQuery("A = LOAD 'leftinput' as (a:int);");
+ pigServer.registerQuery("B = LOAD 'temp_file*' as (a:int);");
+ DataBag dbmrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
+ {
+ pigServer.registerQuery("C = join A by $0 + 10, B by $0 + 10 using \"merge\";");
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ dbmrj.add(iter.next());
+ }
+ }
+ {
+ pigServer.registerQuery("C = join A by $0 + 10, B by $0 + 10;");
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ dbshj.add(iter.next());
+ }
+ }
+ Util.deleteFile(cluster, "temp_file1");
+ Util.deleteFile(cluster, "temp_file2");
+ Util.deleteFile(cluster, "temp_file3");
+ Util.deleteFile(cluster, "leftinput");
+ Assert.assertEquals(dbmrj.size(),dbshj.size());
+ Assert.assertEquals(true, TestHelper.compareBags(dbmrj, dbshj));
+ }
+
+ @Test
+ public void testExpressionFail() throws IOException{
+ pigServer.registerQuery("A = LOAD 'leftinput' as (a:int);");
+ pigServer.registerQuery("B = LOAD 'temp_file*' using " +
+ DummyIndexableLoader.class.getName() + "() as (a:int);");
+ boolean exceptionThrown = false;
+ try {
+ pigServer.registerQuery("C = join A by $0 + 10, B by $0 + 10 using \"merge\";");
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ }catch (Exception e) {
+ e.printStackTrace();
+ PigException pe = LogUtils.getPigException(e);
+ Assert.assertEquals(1106, pe.getErrorCode());
+ exceptionThrown = true;
+ }
+ Assert.assertEquals(true, exceptionThrown);
+ }
+
+ @Test
public void testMergeJoinSch1() throws IOException{
pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (x:int,y:int);");
pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' as (x:int,y:int);");
@@ -478,4 +536,158 @@
shjSch = pigServer.dumpSchema("C");
Assert.assertTrue(shjSch == null);
}
+
+
+ /**
+ * A dummy loader which implements {@link IndexableLoadFunc} to test
+ * that expressions are not allowed as merge join keys when the right input's
+ * loader implements {@link IndexableLoadFunc}
+ */
+ public static class DummyIndexableLoader implements IndexableLoadFunc {
+
+ /**
+ *
+ */
+ public DummyIndexableLoader() {
+ // TODO Auto-generated constructor stub
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.IndexableLoadFunc#close()
+ */
+ @Override
+ public void close() throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.IndexableLoadFunc#seekNear(org.apache.pig.data.Tuple)
+ */
+ @Override
+ public void seekNear(Tuple keys) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.LoadFunc#bindTo(java.lang.String, org.apache.pig.impl.io.BufferedPositionedInputStream, long, long)
+ */
+ @Override
+ public void bindTo(String fileName, BufferedPositionedInputStream is,
+ long offset, long end) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.LoadFunc#bytesToBag(byte[])
+ */
+ @Override
+ public DataBag bytesToBag(byte[] b) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.LoadFunc#bytesToCharArray(byte[])
+ */
+ @Override
+ public String bytesToCharArray(byte[] b) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.LoadFunc#bytesToDouble(byte[])
+ */
+ @Override
+ public Double bytesToDouble(byte[] b) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.LoadFunc#bytesToFloat(byte[])
+ */
+ @Override
+ public Float bytesToFloat(byte[] b) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.LoadFunc#bytesToInteger(byte[])
+ */
+ @Override
+ public Integer bytesToInteger(byte[] b) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.LoadFunc#bytesToLong(byte[])
+ */
+ @Override
+ public Long bytesToLong(byte[] b) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.LoadFunc#bytesToMap(byte[])
+ */
+ @Override
+ public Map<String, Object> bytesToMap(byte[] b) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.LoadFunc#bytesToTuple(byte[])
+ */
+ @Override
+ public Tuple bytesToTuple(byte[] b) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.LoadFunc#determineSchema(java.lang.String, org.apache.pig.ExecType, org.apache.pig.backend.datastorage.DataStorage)
+ */
+ @Override
+ public Schema determineSchema(String fileName, ExecType execType,
+ DataStorage storage) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.LoadFunc#fieldsToRead(org.apache.pig.impl.logicalLayer.schema.Schema)
+ */
+ @Override
+ public void fieldsToRead(Schema schema) {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.LoadFunc#getNext()
+ */
+ @Override
+ public Tuple getNext() throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.IndexableLoadFunc#initialize(org.apache.hadoop.conf.Configuration)
+ */
+ @Override
+ public void initialize(Configuration conf) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ }
}