You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ya...@apache.org on 2010/05/21 19:50:16 UTC

svn commit: r947093 - in /hadoop/pig/branches/branch-0.7/contrib/zebra: ./ src/java/org/apache/hadoop/zebra/mapred/ src/test/org/apache/hadoop/zebra/mapred/

Author: yanz
Date: Fri May 21 17:50:16 2010
New Revision: 947093

URL: http://svn.apache.org/viewvc?rev=947093&view=rev
Log:
PIG-1425 support of source table index on unsorted table in the mapred APIs (yanz)

Added:
    hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestUnsortedTableIndex.java   (with props)
Modified:
    hadoop/pig/branches/branch-0.7/contrib/zebra/CHANGES.txt
    hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableExpr.java
    hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java
    hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java
    hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java
    hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableUnionExpr.java

Modified: hadoop/pig/branches/branch-0.7/contrib/zebra/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/CHANGES.txt?rev=947093&r1=947092&r2=947093&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/contrib/zebra/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.7/contrib/zebra/CHANGES.txt Fri May 21 17:50:16 2010
@@ -18,6 +18,8 @@ Trunk (unreleased changes)
 
   IMPROVEMENTS
 
+    PIG-1425 support of source table index on unsorted table in the mapred APIs (yanz)
+
     PIG-1361 Zebra TableLoader.getSchema() should return the projectionSchema specified in the constructor of TableLoader instead of pruned proejction by pig (gauravj via daijy)
 
     PIG-1291 Support of virtual column "source_table" on unsorted table (yanz)

Modified: hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableExpr.java?rev=947093&r1=947092&r2=947093&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableExpr.java (original)
+++ hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableExpr.java Fri May 21 17:50:16 2010
@@ -29,6 +29,8 @@ import org.apache.hadoop.zebra.io.BasicT
 import org.apache.hadoop.zebra.io.TableScanner;
 import org.apache.hadoop.zebra.parser.ParseException;
 import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.types.Projection;
+import org.apache.pig.data.Tuple;
 
 /**
  * Table expression for reading a BasicTable.
@@ -117,6 +119,12 @@ class BasicTableExpr extends TableExpr {
   } 
 
   @Override
+  public TableScanner getScanner(RowTableSplit split, String projection,
+      Configuration conf) throws IOException, ParseException {
+    return new BasicTableScanner(split, projection, conf);
+  }
+
+  @Override
   public Schema getSchema(Configuration conf) throws IOException {
     return BasicTable.Reader.getSchema(path, conf);
   }
@@ -131,4 +139,75 @@ class BasicTableExpr extends TableExpr {
   {
     BasicTable.dumpInfo(path.toString(), ps, conf, indent);
   }
+
+  /**
+   * Basic Table Scanner
+   */
+  class BasicTableScanner implements TableScanner {
+    private int tableIndex = -1;
+    private Integer[] virtualColumnIndices = null;
+    private TableScanner scanner = null;
+    
+    BasicTableScanner(RowTableSplit split, String projection,
+        Configuration conf) throws IOException, ParseException, ParseException {
+      tableIndex = split.getTableIndex();
+      virtualColumnIndices = Projection.getVirtualColumnIndices(projection);
+      BasicTable.Reader reader =
+        new BasicTable.Reader(new Path(split.getPath()), getDeletedCGs(conf), conf);
+      reader.setProjection(projection);
+      scanner = reader.getScanner(true, split.getSplit());
+    }
+    
+    @Override
+    public boolean advance() throws IOException {
+      return scanner.advance();
+    }
+    
+    @Override
+    public boolean atEnd() throws IOException {
+      return scanner.atEnd();
+    }
+    
+    @Override
+    public Schema getSchema() {
+      return scanner.getSchema();
+    }
+    
+    @Override
+    public void getKey(BytesWritable key) throws IOException {
+      scanner.getKey(key);
+    }
+    
+    @Override
+    public void getValue(Tuple row) throws IOException {
+      scanner.getValue(row);
+      if (virtualColumnIndices != null)
+      {
+        for (int i = 0; i < virtualColumnIndices.length; i++)
+        {
+          row.set(virtualColumnIndices[i], tableIndex);
+        }
+      }
+    }
+    
+    @Override
+    public boolean seekTo(BytesWritable key) throws IOException {
+      return scanner.seekTo(key);
+    }
+    
+    @Override
+    public void seekToEnd() throws IOException {
+      scanner.seekToEnd();
+    }
+    
+    @Override 
+    public void close() throws IOException {
+      scanner.close();
+    }
+    
+    @Override
+    public String getProjection() {
+      return scanner.getProjection();
+    }
+  }
 }

Modified: hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java?rev=947093&r1=947092&r2=947093&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java (original)
+++ hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java Fri May 21 17:50:16 2010
@@ -123,11 +123,8 @@ abstract class TableExpr {
    * @throws IOException
    */
   public TableScanner getScanner(RowTableSplit split, String projection,
-      Configuration conf) throws IOException, ParseException, ParseException {
-    BasicTable.Reader reader =
-        new BasicTable.Reader(new Path(split.getPath()), getDeletedCGs(conf), conf);
-    reader.setProjection(projection);
-    return reader.getScanner(true, split.getSplit());
+      Configuration conf) throws IOException, ParseException {
+    return null;
   }
   
   /**

Modified: hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java?rev=947093&r1=947092&r2=947093&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java (original)
+++ hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java Fri May 21 17:50:16 2010
@@ -235,10 +235,6 @@ public class TableInputFormat implements
    */
   public static void setProjection(JobConf conf, String projection) throws ParseException {
     conf.set(INPUT_PROJ, Schema.normalize(projection));
-
-    // virtual source_table columns require sorted table
-    if (Projection.getVirtualColumnIndices(projection) != null && !getSorted(conf))
-        throw new ParseException("The source_table virtual column is only availabe for sorted table unions.");
   }
   
   /**
@@ -266,10 +262,6 @@ public class TableInputFormat implements
     }
     
     conf.set(INPUT_PROJ, normalizedProjectionString);
-
-    // virtual source_table columns require sorted table
-    if (Projection.getVirtualColumnIndices(projection.toString()) != null && !getSorted(conf))
-      throw new ParseException("The source_table virtual column is only availabe for sorted table unions.");
   }  
 
   /**
@@ -718,6 +710,7 @@ public class TableInputFormat implements
     boolean first = true;
     PathFilter filter = null;
     List<BasicTable.Reader> realReaders = new ArrayList<BasicTable.Reader>();
+    int[] realReaderIndices = new int[readers.size()];
 
     for (int i = 0; i < readers.size(); ++i) {
       BasicTable.Reader reader = readers.get(i);
@@ -727,6 +720,7 @@ public class TableInputFormat implements
       /* We can create input splits only if there does exist a valid column group for split.
        * Otherwise, we do not create input splits. */
       if (splitCGIndex >= 0) {        
+        realReaderIndices[realReaders.size()] = i;
         realReaders.add(reader);
         if (first)
         {
@@ -836,10 +830,10 @@ public class TableInputFormat implements
           batches[++numBatches] = splitLen;
         
         List<RowSplit> subSplits = reader.rowSplit(starts, lengths, paths, splitCGIndex, batches, numBatches);
-    
+        int realTableIndex = realReaderIndices[tableIndex];
         for (Iterator<RowSplit> it = subSplits.iterator(); it.hasNext();) {
           RowSplit subSplit = it.next();
-          RowTableSplit split = new RowTableSplit(reader, subSplit, conf);
+          RowTableSplit split = new RowTableSplit(reader, subSplit, realTableIndex, conf);
           ret.add(split);
         }
       }
@@ -1050,14 +1044,16 @@ class SortedTableSplit implements InputS
  */
 class RowTableSplit implements InputSplit {
   String path = null;
+  int tableIndex;
   RowSplit split = null;
   String[] hosts = null;
   long length = 1;
 
-  public RowTableSplit(Reader reader, RowSplit split, JobConf conf)
+  public RowTableSplit(Reader reader, RowSplit split, int tableIndex, JobConf conf)
       throws IOException {
     this.path = reader.getPath();
     this.split = split;
+    this.tableIndex = tableIndex;
     BlockDistribution dataDist = reader.getBlockDistribution(split);
     if (dataDist != null) {
       length = dataDist.getLength();
@@ -1082,6 +1078,7 @@ class RowTableSplit implements InputSpli
 
   @Override
   public void readFields(DataInput in) throws IOException {
+    tableIndex = WritableUtils.readVInt(in);
     path = WritableUtils.readString(in);
     int bool = WritableUtils.readVInt(in);
     if (bool == 1) {
@@ -1097,6 +1094,7 @@ class RowTableSplit implements InputSpli
 
   @Override
   public void write(DataOutput out) throws IOException {
+    WritableUtils.writeVInt(out, tableIndex);
     WritableUtils.writeString(out, path);
     if (split == null) {
       WritableUtils.writeVInt(out, 0);
@@ -1116,4 +1114,8 @@ class RowTableSplit implements InputSpli
   public RowSplit getSplit() {
     return split;
   }
+  
+  public int getTableIndex() {
+    return tableIndex;
+  }
 }

Modified: hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java?rev=947093&r1=947092&r2=947093&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java (original)
+++ hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java Fri May 21 17:50:16 2010
@@ -53,8 +53,6 @@ public class TableRecordReader implement
 		  InputSplit split, JobConf conf) throws IOException, ParseException {
 	  if( split instanceof RowTableSplit ) {
 		  RowTableSplit rowSplit = (RowTableSplit)split;
-		  if( Projection.getVirtualColumnIndices( projection ) != null )
-			  throw new IllegalArgumentException("virtual column requires union of multiple sorted tables");
 		  scanner = expr.getScanner(rowSplit, projection, conf);
 	  } else {
 		  SortedTableSplit tblSplit = (SortedTableSplit)split;

Modified: hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableUnionExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableUnionExpr.java?rev=947093&r1=947092&r2=947093&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableUnionExpr.java (original)
+++ hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableUnionExpr.java Fri May 21 17:50:16 2010
@@ -157,6 +157,13 @@ class TableUnionExpr extends CompositeTa
       throw new IllegalArgumentException("virtual column requires union of multiple tables");
     return new SortedTableUnionScanner(scanners, Projection.getVirtualColumnIndices(projection));
   }
+
+  @Override
+  public TableScanner getScanner(RowTableSplit split, String projection,
+      Configuration conf) throws IOException, ParseException {
+    BasicTableExpr expr = (BasicTableExpr) composite.get(split.getTableIndex());
+    return expr.getScanner(split, projection, conf);
+  }
 }
 
 /**

Added: hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestUnsortedTableIndex.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestUnsortedTableIndex.java?rev=947093&view=auto
==============================================================================
--- hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestUnsortedTableIndex.java (added)
+++ hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestUnsortedTableIndex.java Fri May 21 17:50:16 2010
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.mapred;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TestBasicTable;
+import org.apache.hadoop.zebra.mapred.RowTableSplit;
+import org.apache.hadoop.zebra.mapred.TableInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.zebra.parser.ParseException;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.pig.data.Tuple;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestUnsortedTableIndex {
+	private static Configuration conf;
+	private static JobConf jobConf;
+	private static Path path1, path2;
+
+	@BeforeClass
+	public static void setUpOnce() throws IOException {
+		TestBasicTable.setUpOnce();
+		conf = TestBasicTable.conf;
+    jobConf = new JobConf(conf);
+		path1 = new Path(TestBasicTable.rootPath, "TestUnsortedTableIndex1");
+		path2 = new Path(TestBasicTable.rootPath, "TestUnsortedTableIndex2");
+	}
+
+	@AfterClass
+	public static void tearDown() throws IOException {
+		BasicTable.drop(path1, conf);
+		BasicTable.drop(path2, conf);
+	}
+
+	@Test
+	public void testUnsortedTableIndex() 
+	throws IOException, ParseException, InterruptedException {
+		BasicTable.drop(path1, conf);
+		BasicTable.drop(path2, conf);
+		int total1 = TestBasicTable.createBasicTable(1, 100, "a, b, c, d, e, f", "[a, b]; [c, d]", null, path1, true);    
+		int total2 = TestBasicTable.createBasicTable(1, 100, "a, b, c, d, e, f", "[a, b]; [c, d]", null, path2, true);    
+
+		TableInputFormat inputFormat = new TableInputFormat();
+		TableInputFormat.setInputPaths(jobConf, path1, path2);
+		TableInputFormat.setProjection(jobConf, "source_table");
+		InputSplit[] splits = inputFormat.getSplits(jobConf, -1);
+    Assert.assertEquals(splits.length, 2);
+    for (int i = 0; i < 2; i++)
+    {
+	    int count = 0;
+	  	RowTableSplit split = (RowTableSplit) splits[i];
+	    TableRecordReader rr = (TableRecordReader) inputFormat.getRecordReader(split, jobConf, null);
+	    Tuple t = TypesUtils.createTuple(1);
+      BytesWritable key = new BytesWritable();
+	    while (rr.next(key, t)) {
+        int idx= (Integer) t.get(0);
+        Assert.assertEquals(idx, i);
+	      count++;
+	    }
+	    rr.close();
+      if (i == 0)
+  	    Assert.assertEquals(count, total1);
+      else
+  	    Assert.assertEquals(count, total2);
+    }
+	}
+}

Propchange: hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestUnsortedTableIndex.java
------------------------------------------------------------------------------
    svn:executable = *