You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ya...@apache.org on 2010/05/21 19:50:16 UTC
svn commit: r947093 - in /hadoop/pig/branches/branch-0.7/contrib/zebra: ./
src/java/org/apache/hadoop/zebra/mapred/
src/test/org/apache/hadoop/zebra/mapred/
Author: yanz
Date: Fri May 21 17:50:16 2010
New Revision: 947093
URL: http://svn.apache.org/viewvc?rev=947093&view=rev
Log:
PIG-1425 support of source table index on unsorted table in the mapred APIs (yanz)
Added:
hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestUnsortedTableIndex.java (with props)
Modified:
hadoop/pig/branches/branch-0.7/contrib/zebra/CHANGES.txt
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableExpr.java
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableUnionExpr.java
Modified: hadoop/pig/branches/branch-0.7/contrib/zebra/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/CHANGES.txt?rev=947093&r1=947092&r2=947093&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/contrib/zebra/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.7/contrib/zebra/CHANGES.txt Fri May 21 17:50:16 2010
@@ -18,6 +18,8 @@ Trunk (unreleased changes)
IMPROVEMENTS
+ PIG-1425 support of source table index on unsorted table in the mapred APIs (yanz)
+
PIG-1361 Zebra TableLoader.getSchema() should return the projectionSchema specified in the constructor of TableLoader instead of pruned proejction by pig (gauravj via daijy)
PIG-1291 Support of virtual column "source_table" on unsorted table (yanz)
Modified: hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableExpr.java?rev=947093&r1=947092&r2=947093&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableExpr.java (original)
+++ hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableExpr.java Fri May 21 17:50:16 2010
@@ -29,6 +29,8 @@ import org.apache.hadoop.zebra.io.BasicT
import org.apache.hadoop.zebra.io.TableScanner;
import org.apache.hadoop.zebra.parser.ParseException;
import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.types.Projection;
+import org.apache.pig.data.Tuple;
/**
* Table expression for reading a BasicTable.
@@ -117,6 +119,12 @@ class BasicTableExpr extends TableExpr {
}
@Override
+ public TableScanner getScanner(RowTableSplit split, String projection,
+ Configuration conf) throws IOException, ParseException {
+ return new BasicTableScanner(split, projection, conf);
+ }
+
+ @Override
public Schema getSchema(Configuration conf) throws IOException {
return BasicTable.Reader.getSchema(path, conf);
}
@@ -131,4 +139,75 @@ class BasicTableExpr extends TableExpr {
{
BasicTable.dumpInfo(path.toString(), ps, conf, indent);
}
+
+ /**
+ * Basic Table Scanner
+ */
+ class BasicTableScanner implements TableScanner {
+ private int tableIndex = -1;
+ private Integer[] virtualColumnIndices = null;
+ private TableScanner scanner = null;
+
+ BasicTableScanner(RowTableSplit split, String projection,
+ Configuration conf) throws IOException, ParseException, ParseException {
+ tableIndex = split.getTableIndex();
+ virtualColumnIndices = Projection.getVirtualColumnIndices(projection);
+ BasicTable.Reader reader =
+ new BasicTable.Reader(new Path(split.getPath()), getDeletedCGs(conf), conf);
+ reader.setProjection(projection);
+ scanner = reader.getScanner(true, split.getSplit());
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ return scanner.advance();
+ }
+
+ @Override
+ public boolean atEnd() throws IOException {
+ return scanner.atEnd();
+ }
+
+ @Override
+ public Schema getSchema() {
+ return scanner.getSchema();
+ }
+
+ @Override
+ public void getKey(BytesWritable key) throws IOException {
+ scanner.getKey(key);
+ }
+
+ @Override
+ public void getValue(Tuple row) throws IOException {
+ scanner.getValue(row);
+ if (virtualColumnIndices != null)
+ {
+ for (int i = 0; i < virtualColumnIndices.length; i++)
+ {
+ row.set(virtualColumnIndices[i], tableIndex);
+ }
+ }
+ }
+
+ @Override
+ public boolean seekTo(BytesWritable key) throws IOException {
+ return scanner.seekTo(key);
+ }
+
+ @Override
+ public void seekToEnd() throws IOException {
+ scanner.seekToEnd();
+ }
+
+ @Override
+ public void close() throws IOException {
+ scanner.close();
+ }
+
+ @Override
+ public String getProjection() {
+ return scanner.getProjection();
+ }
+ }
}
Modified: hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java?rev=947093&r1=947092&r2=947093&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java (original)
+++ hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java Fri May 21 17:50:16 2010
@@ -123,11 +123,8 @@ abstract class TableExpr {
* @throws IOException
*/
public TableScanner getScanner(RowTableSplit split, String projection,
- Configuration conf) throws IOException, ParseException, ParseException {
- BasicTable.Reader reader =
- new BasicTable.Reader(new Path(split.getPath()), getDeletedCGs(conf), conf);
- reader.setProjection(projection);
- return reader.getScanner(true, split.getSplit());
+ Configuration conf) throws IOException, ParseException {
+ return null;
}
/**
Modified: hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java?rev=947093&r1=947092&r2=947093&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java (original)
+++ hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java Fri May 21 17:50:16 2010
@@ -235,10 +235,6 @@ public class TableInputFormat implements
*/
public static void setProjection(JobConf conf, String projection) throws ParseException {
conf.set(INPUT_PROJ, Schema.normalize(projection));
-
- // virtual source_table columns require sorted table
- if (Projection.getVirtualColumnIndices(projection) != null && !getSorted(conf))
- throw new ParseException("The source_table virtual column is only availabe for sorted table unions.");
}
/**
@@ -266,10 +262,6 @@ public class TableInputFormat implements
}
conf.set(INPUT_PROJ, normalizedProjectionString);
-
- // virtual source_table columns require sorted table
- if (Projection.getVirtualColumnIndices(projection.toString()) != null && !getSorted(conf))
- throw new ParseException("The source_table virtual column is only availabe for sorted table unions.");
}
/**
@@ -718,6 +710,7 @@ public class TableInputFormat implements
boolean first = true;
PathFilter filter = null;
List<BasicTable.Reader> realReaders = new ArrayList<BasicTable.Reader>();
+ int[] realReaderIndices = new int[readers.size()];
for (int i = 0; i < readers.size(); ++i) {
BasicTable.Reader reader = readers.get(i);
@@ -727,6 +720,7 @@ public class TableInputFormat implements
/* We can create input splits only if there does exist a valid column group for split.
* Otherwise, we do not create input splits. */
if (splitCGIndex >= 0) {
+ realReaderIndices[realReaders.size()] = i;
realReaders.add(reader);
if (first)
{
@@ -836,10 +830,10 @@ public class TableInputFormat implements
batches[++numBatches] = splitLen;
List<RowSplit> subSplits = reader.rowSplit(starts, lengths, paths, splitCGIndex, batches, numBatches);
-
+ int realTableIndex = realReaderIndices[tableIndex];
for (Iterator<RowSplit> it = subSplits.iterator(); it.hasNext();) {
RowSplit subSplit = it.next();
- RowTableSplit split = new RowTableSplit(reader, subSplit, conf);
+ RowTableSplit split = new RowTableSplit(reader, subSplit, realTableIndex, conf);
ret.add(split);
}
}
@@ -1050,14 +1044,16 @@ class SortedTableSplit implements InputS
*/
class RowTableSplit implements InputSplit {
String path = null;
+ int tableIndex;
RowSplit split = null;
String[] hosts = null;
long length = 1;
- public RowTableSplit(Reader reader, RowSplit split, JobConf conf)
+ public RowTableSplit(Reader reader, RowSplit split, int tableIndex, JobConf conf)
throws IOException {
this.path = reader.getPath();
this.split = split;
+ this.tableIndex = tableIndex;
BlockDistribution dataDist = reader.getBlockDistribution(split);
if (dataDist != null) {
length = dataDist.getLength();
@@ -1082,6 +1078,7 @@ class RowTableSplit implements InputSpli
@Override
public void readFields(DataInput in) throws IOException {
+ tableIndex = WritableUtils.readVInt(in);
path = WritableUtils.readString(in);
int bool = WritableUtils.readVInt(in);
if (bool == 1) {
@@ -1097,6 +1094,7 @@ class RowTableSplit implements InputSpli
@Override
public void write(DataOutput out) throws IOException {
+ WritableUtils.writeVInt(out, tableIndex);
WritableUtils.writeString(out, path);
if (split == null) {
WritableUtils.writeVInt(out, 0);
@@ -1116,4 +1114,8 @@ class RowTableSplit implements InputSpli
public RowSplit getSplit() {
return split;
}
+
+ public int getTableIndex() {
+ return tableIndex;
+ }
}
Modified: hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java?rev=947093&r1=947092&r2=947093&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java (original)
+++ hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java Fri May 21 17:50:16 2010
@@ -53,8 +53,6 @@ public class TableRecordReader implement
InputSplit split, JobConf conf) throws IOException, ParseException {
if( split instanceof RowTableSplit ) {
RowTableSplit rowSplit = (RowTableSplit)split;
- if( Projection.getVirtualColumnIndices( projection ) != null )
- throw new IllegalArgumentException("virtual column requires union of multiple sorted tables");
scanner = expr.getScanner(rowSplit, projection, conf);
} else {
SortedTableSplit tblSplit = (SortedTableSplit)split;
Modified: hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableUnionExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableUnionExpr.java?rev=947093&r1=947092&r2=947093&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableUnionExpr.java (original)
+++ hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableUnionExpr.java Fri May 21 17:50:16 2010
@@ -157,6 +157,13 @@ class TableUnionExpr extends CompositeTa
throw new IllegalArgumentException("virtual column requires union of multiple tables");
return new SortedTableUnionScanner(scanners, Projection.getVirtualColumnIndices(projection));
}
+
+ @Override
+ public TableScanner getScanner(RowTableSplit split, String projection,
+ Configuration conf) throws IOException, ParseException {
+ BasicTableExpr expr = (BasicTableExpr) composite.get(split.getTableIndex());
+ return expr.getScanner(split, projection, conf);
+ }
}
/**
Added: hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestUnsortedTableIndex.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestUnsortedTableIndex.java?rev=947093&view=auto
==============================================================================
--- hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestUnsortedTableIndex.java (added)
+++ hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestUnsortedTableIndex.java Fri May 21 17:50:16 2010
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.mapred;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TestBasicTable;
+import org.apache.hadoop.zebra.mapred.RowTableSplit;
+import org.apache.hadoop.zebra.mapred.TableInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.zebra.parser.ParseException;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.pig.data.Tuple;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestUnsortedTableIndex {
+ private static Configuration conf;
+ private static JobConf jobConf;
+ private static Path path1, path2;
+
+ @BeforeClass
+ public static void setUpOnce() throws IOException {
+ TestBasicTable.setUpOnce();
+ conf = TestBasicTable.conf;
+ jobConf = new JobConf(conf);
+ path1 = new Path(TestBasicTable.rootPath, "TestUnsortedTableIndex1");
+ path2 = new Path(TestBasicTable.rootPath, "TestUnsortedTableIndex2");
+ }
+
+ @AfterClass
+ public static void tearDown() throws IOException {
+ BasicTable.drop(path1, conf);
+ BasicTable.drop(path2, conf);
+ }
+
+ @Test
+ public void testUnsortedTableIndex()
+ throws IOException, ParseException, InterruptedException {
+ BasicTable.drop(path1, conf);
+ BasicTable.drop(path2, conf);
+ int total1 = TestBasicTable.createBasicTable(1, 100, "a, b, c, d, e, f", "[a, b]; [c, d]", null, path1, true);
+ int total2 = TestBasicTable.createBasicTable(1, 100, "a, b, c, d, e, f", "[a, b]; [c, d]", null, path2, true);
+
+ TableInputFormat inputFormat = new TableInputFormat();
+ TableInputFormat.setInputPaths(jobConf, path1, path2);
+ TableInputFormat.setProjection(jobConf, "source_table");
+ InputSplit[] splits = inputFormat.getSplits(jobConf, -1);
+ Assert.assertEquals(splits.length, 2);
+ for (int i = 0; i < 2; i++)
+ {
+ int count = 0;
+ RowTableSplit split = (RowTableSplit) splits[i];
+ TableRecordReader rr = (TableRecordReader) inputFormat.getRecordReader(split, jobConf, null);
+ Tuple t = TypesUtils.createTuple(1);
+ BytesWritable key = new BytesWritable();
+ while (rr.next(key, t)) {
+ int idx= (Integer) t.get(0);
+ Assert.assertEquals(idx, i);
+ count++;
+ }
+ rr.close();
+ if (i == 0)
+ Assert.assertEquals(count, total1);
+ else
+ Assert.assertEquals(count, total2);
+ }
+ }
+}
Propchange: hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestUnsortedTableIndex.java
------------------------------------------------------------------------------
svn:executable = *