You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2009/11/24 18:50:58 UTC

svn commit: r883800 - in /hadoop/pig/trunk/contrib/zebra: CHANGES.txt src/java/org/apache/hadoop/zebra/io/ColumnGroup.java src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java src/test/org/apache/hadoop/zebra/pig/TestMergeJoinEmpty.java

Author: gates
Date: Tue Nov 24 17:50:58 2009
New Revision: 883800

URL: http://svn.apache.org/viewvc?rev=883800&view=rev
Log:
PIG_1078: merge join with empty table failed.

Added:
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMergeJoinEmpty.java
Modified:
    hadoop/pig/trunk/contrib/zebra/CHANGES.txt
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java

Modified: hadoop/pig/trunk/contrib/zebra/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/CHANGES.txt?rev=883800&r1=883799&r2=883800&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/CHANGES.txt (original)
+++ hadoop/pig/trunk/contrib/zebra/CHANGES.txt Tue Nov 24 17:50:58 2009
@@ -28,6 +28,7 @@
   OPTIMIZATIONS
 
   BUG FIXES
+    PIG_1078: merge join with empty table failed (yanz via gates)
 
     PIG-1091: Exception when load with projection of map keys on a map column
 	that is not map split (yanz via gates).

Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java?rev=883800&r1=883799&r2=883800&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java Tue Nov 24 17:50:58 2009
@@ -509,14 +509,16 @@
       }
 
       BlockDistribution ret = new BlockDistribution();
-      CGIndexEntry entry = cgindex.get(split.fileIndex);
-      FileStatus tfileStatus = fs.getFileStatus(new Path(path, entry.getName())); 
-      
-      BlockLocation[] locations = fs.getFileBlockLocations(tfileStatus, split.startByte, split.numBytes);
-      for (BlockLocation l : locations) {
-        ret.add(l);
-      }
-      
+      if (split.fileIndex >= 0)
+      {
+        CGIndexEntry entry = cgindex.get(split.fileIndex);
+        FileStatus tfileStatus = fs.getFileStatus(new Path(path, entry.getName())); 
+        
+        BlockLocation[] locations = fs.getFileBlockLocations(tfileStatus, split.startByte, split.numBytes);
+        for (BlockLocation l : locations) {
+          ret.add(l);
+        }
+      } 
       return ret;
     }
 
@@ -530,6 +532,9 @@
     void fillRowSplit(CGRowSplit rowSplit, long startOffset, long length) 
                       throws IOException {
 
+      if (rowSplit.fileIndex < 0)
+        return;
+
       Path tfPath = new Path(path, cgindex.get(rowSplit.fileIndex).getName());
       FileStatus tfile = fs.getFileStatus(tfPath);
 
@@ -748,7 +753,6 @@
         long length = lengths[i];
         Path path = paths[i];
         int idx = cgindex.getFileIndex(path);        
-        
         lst.add(new CGRowSplit(idx, start, length));
       }
       
@@ -1061,6 +1065,10 @@
         if (!isSorted()) {
           throw new IOException("Cannot seek in unsorted Column Gruop");
         }
+        if (atEnd())
+        {
+          return false;
+        }
         int index =
             cgindex.lowerBound(new ByteArray(key.get(), 0, key.getSize()),
                 comparator);
@@ -1764,6 +1772,8 @@
     
     int getFileIndex(Path path) throws IOException {
       String filename = path.getName();
+      if (index.isEmpty())
+        return -1;
       for (CGIndexEntry cgie : index) {
         if (cgie.getName().equals(filename)) {
           return cgie.getIndex(); 

Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java?rev=883800&r1=883799&r2=883800&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java Tue Nov 24 17:50:58 2009
@@ -486,11 +486,6 @@
         bd = BlockDistribution.sum(bd, reader.getBlockDistribution((RangeSplit)null));
       }
       
-      if (bd == null) {
-        // should never happen.
-        return new InputSplit[0];
-      }
-      
       SortedTableSplit split = new SortedTableSplit(null, null, bd, conf);
       return new InputSplit[] { split };
     }
@@ -509,7 +504,8 @@
     
     if (keyDistri == null) {
       // should never happen.
-      return new InputSplit[0];
+      SortedTableSplit split = new SortedTableSplit(null, null, null, conf);
+      return new InputSplit[] { split };
     }
     
     if (numSplits > 0) {
@@ -571,13 +567,20 @@
 
     ArrayList<InputSplit> ret = new ArrayList<InputSplit>();
     if (numSplits <= 0) {
-      for (int i = 0; i < readers.size(); ++i) {
-        BasicTable.Reader reader = readers.get(i);
-        List<RangeSplit> subSplits = reader.rangeSplit(-1);
-        for (Iterator<RangeSplit> it = subSplits.iterator(); it.hasNext();) {
-          UnsortedTableSplit split =
+      if (totalBytes <= 0) {
+        BasicTable.Reader reader = readers.get(0);
+        UnsortedTableSplit split =
+          new UnsortedTableSplit(reader, null, conf);
+        ret.add(split);
+      } else {
+        for (int i = 0; i < readers.size(); ++i) {
+          BasicTable.Reader reader = readers.get(i);
+          List<RangeSplit> subSplits = reader.rangeSplit(-1);
+          for (Iterator<RangeSplit> it = subSplits.iterator(); it.hasNext();) {
+            UnsortedTableSplit split =
               new UnsortedTableSplit(reader, it.next(), conf);
-          ret.add(split);
+            ret.add(split);
+          }
         }
       }
     } else {
@@ -700,14 +703,8 @@
           new BasicTable.Reader(leaf.getPath(), conf);
         reader.setProjection(leaf.getProjection());
         BasicTableStatus s = reader.getStatus();
-        if ((s.getRows() != 0) && (s.getSize() > 0)) {
-          // skipping empty tables.
-          // getRows() may return -1 to indicate unknown row numbers.
-          readers.add(reader);
-          status.add(s);
-        } else {
-          reader.close();
-        }
+        readers.add(reader);
+        status.add(s);
       }
       
       if (readers.isEmpty()) {
@@ -795,6 +792,12 @@
 
   @Override
   public String[] getLocations() throws IOException {
+    if (hosts == null)
+    {
+      String[] tmp = new String[1];
+      tmp[0] = "";
+      return tmp;
+    }
     return hosts;
   }
 
@@ -884,6 +887,12 @@
 
   @Override
   public String[] getLocations() throws IOException {
+    if (hosts == null)
+    {
+      String[] tmp = new String[1];
+      tmp[0] = "";
+      return tmp;
+    }
     return hosts;
   }
 

Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMergeJoinEmpty.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMergeJoinEmpty.java?rev=883800&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMergeJoinEmpty.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMergeJoinEmpty.java Tue Nov 24 17:50:58 2009
@@ -0,0 +1,464 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.pig;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import junit.framework.Assert;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.io.TableScanner;
+import org.apache.hadoop.zebra.io.BasicTable.Reader.RangeSplit;
+import org.apache.hadoop.zebra.pig.TableStorer;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.parser.ParseException;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.MiniCluster;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestMergeJoinEmpty {
+  protected static ExecType execType = ExecType.MAPREDUCE;
+  private static MiniCluster cluster;
+  protected static PigServer pigServer;
+  private static Configuration conf;
+  private static FileSystem fs;
+  final static int numsBatch = 4;
+  final static int numsInserters = 1;
+  static Path pathWorking;
+  static Path pathTable1;
+  static Path pathTable2;
+  final static String STR_SCHEMA1 = "a:int,b:float,c:long,d:double,e:string,f:bytes,r1:record(f1:string, f2:string),m1:map(string)";
+  final static String STR_SCHEMA2 = "m1:map(string),r1:record(f1:string, f2:string),f:bytes,e:string,d:double,c:long,b:float,a:int";
+
+  final static String STR_STORAGE1 = "[a, b, c]; [e, f]; [r1.f1]; [m1#{a}]";
+  final static String STR_STORAGE2 = "[a];[b]; [c]; [e]; [f]; [r1.f1]; [m1#{a}]";
+  static int t1 =0;
+ 
+  @BeforeClass
+  public static void setUp() throws Exception {
+    if (System.getProperty("hadoop.log.dir") == null) {
+      String base = new File(".").getPath(); // getAbsolutePath();
+      System
+          .setProperty("hadoop.log.dir", new Path(base).toString() + "./logs");
+    }
+
+    if (execType == ExecType.MAPREDUCE) {
+      cluster = MiniCluster.buildCluster();
+      pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+    } else {
+      pigServer = new PigServer(ExecType.LOCAL);
+    }
+   
+   
+    fs = cluster.getFileSystem();
+   
+
+ conf = new Configuration();
+  
+    
+    pathWorking = fs.getWorkingDirectory();
+    pathTable1 = new Path(pathWorking, "table1");
+    pathTable2 = new Path(pathWorking, "table2");
+    System.out.println("pathTable1 =" + pathTable1);
+    createFirstTable();
+    createSecondTable();
+  }
+  public static void createFirstTable() throws IOException, ParseException {
+    BasicTable.Writer writer = new BasicTable.Writer(pathTable1, STR_SCHEMA1,
+        STR_STORAGE1, conf);
+    Schema schema = writer.getSchema();
+    //System.out.println("typeName" + schema.getColumn("a").type.pigDataType());
+    Tuple tuple = TypesUtils.createTuple(schema);
+  
+    TableInserter[] inserters = new TableInserter[numsInserters];
+    for (int i = 0; i < numsInserters; i++) {
+      inserters[i] = writer.getInserter("ins" + i, false);
+    }
+    Tuple tupRecord1;
+    try {
+      tupRecord1 = TypesUtils.createTuple(schema.getColumnSchema("r1")
+          .getSchema());
+    } catch (ParseException e) {
+      e.printStackTrace();
+      throw new IOException(e);
+    }
+    Map<String, String> m1 = new HashMap<String, String>();
+    for (int b = 0; b < numsBatch; b++) {
+      for (int i = 0; i < numsInserters; i++) {
+        TypesUtils.resetTuple(tupRecord1);
+        TypesUtils.resetTuple(tuple);
+        m1.clear();
+  
+        try {
+          // first row of the table , the biggest row
+          if (i == 0 && b == 0) {
+            tuple.set(0, 100);
+            tuple.set(1, 100.1f);
+            tuple.set(2, 100L);
+            tuple.set(3, 50e+2);
+            tuple.set(4, "something");
+            tuple.set(5, new DataByteArray("something"));
+  
+          }
+          // the middle + 1 row of the table, the smallest row
+          else if (i == 0 && b == (numsBatch / 2)) {
+            tuple.set(0, -100);
+            tuple.set(1, -100.1f);
+            tuple.set(2, -100L);
+            tuple.set(3, -50e+2);
+            tuple.set(4, "so");
+            tuple.set(5, new DataByteArray("so"));
+  
+          }
+  
+          else {
+            Float f = 1.1f;
+            long l = 11;
+            double d = 1.1;
+            tuple.set(0, b);
+            tuple.set(1, f);
+            tuple.set(2, l);
+            tuple.set(3, d);
+            tuple.set(4, "some");
+            tuple.set(5, new DataByteArray("some"));
+          }
+  
+          // insert record
+          tupRecord1.set(0, "" + b);
+          tupRecord1.set(1, "" + b);
+          tuple.set(6, tupRecord1);
+  
+          // insert map
+          m1.put("a", "" + b);
+          m1.put("b", "" + b);
+          m1.put("c", "" + b);
+          tuple.set(7, m1);
+  
+        } catch (ExecException e) {
+          e.printStackTrace();
+        }
+  
+       inserters[i].insert(new BytesWritable(("key_" + b).getBytes()), tuple);
+       
+      }
+    }
+    for (int i = 0; i < numsInserters; i++) {
+      inserters[i].close();
+    }
+    writer.close();
+    
+  
+    //check table is setup correctly
+    String projection = new String("a,b,c,d,e,f,r1,m1");
+    
+    BasicTable.Reader reader = new BasicTable.Reader(pathTable1, conf);
+    reader.setProjection(projection);
+    List<RangeSplit> splits = reader.rangeSplit(1);
+    TableScanner scanner = reader.getScanner(splits.get(0), true);
+    BytesWritable key = new BytesWritable();
+    Tuple RowValue = TypesUtils.createTuple(scanner.getSchema());
+  
+    scanner.getValue(RowValue);
+    System.out.println("rowvalue size:"+RowValue.size());
+    System.out.println("read a : " + RowValue.get(0).toString());
+    System.out.println("read string: " + RowValue.get(1).toString());
+  
+    scanner.advance();
+    scanner.getValue(RowValue);
+    System.out.println("read float in 2nd row: "+ RowValue.get(1).toString());
+    System.out.println("done insert table");
+  
+    reader.close();
+    
+  }
+  public static void createSecondTable() throws IOException, ParseException {
+    BasicTable.Writer writer = new BasicTable.Writer(pathTable2, STR_SCHEMA2,
+        STR_STORAGE2, conf);
+    Schema schema = writer.getSchema();
+    //System.out.println("typeName" + schema.getColumn("a").type.pigDataType());
+    Tuple tuple = TypesUtils.createTuple(schema);
+
+    TableInserter[] inserters = new TableInserter[numsInserters];
+    for (int i = 0; i < numsInserters; i++) {
+      inserters[i] = writer.getInserter("ins" + i, false);
+    }
+    Tuple tupRecord1;
+    try {
+      tupRecord1 = TypesUtils.createTuple(schema.getColumnSchema("r1")
+          .getSchema());
+    } catch (ParseException e) {
+      e.printStackTrace();
+      throw new IOException(e);
+    }
+    Map<String, String> m1 = new HashMap<String, String>();
+    for (int b = 0; b < numsBatch; b++) {
+      for (int i = 0; i < numsInserters; i++) {
+        TypesUtils.resetTuple(tupRecord1);
+        TypesUtils.resetTuple(tuple);
+        m1.clear();
+
+        try {
+           // first row of the table , the biggest row
+          if (i == 0 && b == 0) {
+            tuple.set(7, 100);
+            tuple.set(6, 100.1f);
+            tuple.set(5, 100L);
+            tuple.set(4, 50e+2);
+            tuple.set(3, "something");
+            tuple.set(2, new DataByteArray("something"));
+
+          }
+          // the middle +1 row of the table, the smallest row
+          else if (i == 0 && b == (numsBatch / 2)) {
+            tuple.set(7, -100);
+            tuple.set(6, -100.1f);
+            tuple.set(5, -100L);
+            tuple.set(4, -50e+2);
+            tuple.set(3, "so");
+            tuple.set(2, new DataByteArray("so"));
+
+          }
+
+          else {
+            Float f = 2.1f;
+            long l = 12;
+            double d = 2.1;
+            tuple.set(7, b*2);
+            tuple.set(6, f);
+            tuple.set(5, l);
+            tuple.set(4, d);
+            tuple.set(3, "somee");
+            tuple.set(2, new DataByteArray("somee"));
+          }
+
+          // insert record
+          tupRecord1.set(0, "" + b);
+          tupRecord1.set(1, "" + b);
+          tuple.set(1, tupRecord1);
+
+          // insert map
+
+          m1.put("a", "" + b);
+          m1.put("b", "" + b);
+          m1.put("c", "" + b);
+          tuple.set(0, m1);
+
+        } catch (ExecException e) {
+          e.printStackTrace();
+        }
+//      empty table
+//        inserters[i].insert(new BytesWritable(("key" + b).getBytes()), tuple);
+      }
+    }
+    for (int i = 0; i < numsInserters; i++) {
+      inserters[i].close();
+    }
+    writer.close();
+    
+    
+    
+    
+    //check table is setup correctly
+    String projection = new String("a,b,c,d,e,f,r1,m1");
+    
+    BasicTable.Reader reader = new BasicTable.Reader(pathTable2, conf);
+    reader.setProjection(projection);
+    List<RangeSplit> splits = reader.rangeSplit(1);
+    TableScanner scanner = reader.getScanner(splits.get(0), true);
+    BytesWritable key = new BytesWritable();
+    Tuple RowValue = TypesUtils.createTuple(scanner.getSchema());
+
+    /*
+    scanner.getValue(RowValue);
+    System.out.println("rowvalue size:"+RowValue.size());
+    System.out.println("read a : " + RowValue.get(7).toString());
+    System.out.println("read string: " + RowValue.get(6).toString());
+   
+    scanner.advance();
+    scanner.getValue(RowValue);
+    System.out.println("read float in 2nd row: "+ RowValue.get(6).toString());
+    System.out.println("done insert table");
+    */
+
+
+    reader.close();
+    
+  }
+  
+  public static void sortTable(Path tablePath, String sortkey){
+    
+  }
+  @AfterClass
+  public static void tearDown() throws Exception {
+    pigServer.shutdown();
+  }
+
+
+  public void verify(Iterator<Tuple> it3) throws ExecException {
+    int row = 0;
+    Tuple RowValue3 = null;
+    Assert.assertEquals(0, row);
+  }
+
+  public Iterator<Tuple> joinTable(String table1, String table2, String sortkey1, String sortkey2) throws IOException {
+    String query1 = "records1 = LOAD '" + this.pathTable1.toString()
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+    System.out.println("query1:" + query1);
+    pigServer.registerQuery(query1);
+
+    String query2 = "records2 = LOAD '" + this.pathTable2.toString()
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+    System.out.println("query2:" + query2);
+    pigServer.registerQuery(query2);
+
+   /* Iterator<Tuple> it_before_order = pigServer.openIterator("records");
+    int row_before_order = 0;
+    Tuple RowValue_before_order = null;
+    while (it_before_order.hasNext()) {
+      RowValue_before_order = it_before_order.next();
+      row_before_order++;
+      System.out.println("row : " + row_before_order + " field f value: "
+          + RowValue_before_order.get(5));
+    }
+    System.out.println("total row for orig table before ordered:"
+        + row_before_order);*/
+    String orderby1 = "sort1 = ORDER records1 BY " + sortkey1 + " ;";
+    String orderby2 = "sort2 = ORDER records2 BY " + sortkey2 + " ;";
+    pigServer.registerQuery(orderby1);
+    pigServer.registerQuery(orderby2);
+
+    /*Iterator<Tuple> it_after_order = pigServer.openIterator("srecs");
+    int row_after_order = 0;
+    Tuple RowValue_after_order = null;
+    while (it_after_order.hasNext()) {
+      RowValue_after_order = it_after_order.next();
+      row_after_order++;
+      System.out.println("row : " + row_after_order + " field b value: "
+          + RowValue_after_order.get(1));
+    }
+    System.out.println("total row for orig table after ordered:"
+        + row_after_order);*/
+    // Path newPath = new Path(getCurrentMethodName());
+
+    /*
+     * Table1 creation
+     */
+    this.t1++;
+    
+    String table1path = this.pathTable1.toString() + Integer.toString(this.t1);
+    pigServer.store("sort1", table1path, TableStorer.class.getCanonicalName()
+        + "('[a, b, c]; [d, e, f, r1, m1]')");
+
+    String query3 = "records1 = LOAD '"
+        + table1path
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader('a, b, c, d, e, f, r1, m1', 'sorted');";
+
+    System.out.println("query3:" + query3);
+    pigServer.registerQuery(query3);   
+    
+    String foreach = "records11 = foreach records1 generate a as a, b as b, c as c, d as d, e as e, f as f, r1 as r1, m1 as m1;";
+    pigServer.registerQuery(foreach);
+   /* Iterator<Tuple> it_ordered = pigServer.openIterator("records1");
+    int row_ordered = 0;
+    Tuple RowValue_ordered = null;
+    while (it_ordered.hasNext()) {
+      RowValue_ordered = it_ordered.next();
+      row_ordered++;
+      System.out.println("row : " + row_ordered + " field a value: "
+          + RowValue_ordered.get(0));
+    }
+    System.out.println("total row for table 1 after ordered:" + row_ordered);*/
+
+    /*
+     * Table2 creation
+     */
+    this.t1++;
+    String table2path = this.pathTable2.toString() + Integer.toString(this.t1);
+    pigServer.store("sort2", table2path, TableStorer.class.getCanonicalName()
+        + "('[a, b, c]; [d,e,f,r1,m1]')");
+
+    String query4 = "records2 = LOAD '" + table2path
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+    pigServer.registerQuery(query4);
+
+    
+    String filter = "records22 = FILTER records2 BY a == '1.9';";
+    pigServer.registerQuery(filter);
+    /*Iterator<Tuple> it_ordered2 = pigServer.openIterator("records2");
+    int row_ordered2 = 0;
+    Tuple RowValue_ordered2 = null;
+    while (it_ordered2.hasNext()) {
+      RowValue_ordered2 = it_ordered2.next();
+      row_ordered2++;
+      System.out.println("row for table 2 after ordereed: " + row_ordered2
+          + " field a value: " + RowValue_ordered2.get(0));
+    }
+
+    System.out.println("total row for table 2:" + row_ordered2);
+    */
+    String join = "joinRecords = JOIN records11 BY " + "(" + sortkey1 + ")" 
+        + " , records2 BY " + "("+ sortkey2 + ")"+" USING \"merge\";";
+   //TODO: can not use records22
+      pigServer.registerQuery(join);
+   
+    // check JOIN content
+    Iterator<Tuple> it3 = pigServer.openIterator("joinRecords");
+    return it3;
+  }
+  
+  @Test
+  public void test1() throws ExecException, IOException {
+    /*
+     * join key: single byte column
+     */
+    /*
+     * right empty table
+     */
+    Iterator<Tuple> it3 = joinTable(this.pathTable1.toString(), this.pathTable2.toString(), "f","f" );
+    verify(it3);
+  }
+  
+  @Test
+  public void test2() throws ExecException, IOException {
+    /*
+     * join key: single byte column
+     */
+    /*
+     * left empty table
+     */
+    Iterator<Tuple> it3 = joinTable(this.pathTable2.toString(), this.pathTable1.toString(), "f","f" );
+    verify(it3);
+  }
+  
+}