You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ya...@apache.org on 2010/02/13 01:06:17 UTC
svn commit: r909667 [1/9] - in
/hadoop/pig/branches/load-store-redesign/contrib/zebra: ./
src/java/org/apache/hadoop/zebra/ src/java/org/apache/hadoop/zebra/io/
src/java/org/apache/hadoop/zebra/mapred/
src/java/org/apache/hadoop/zebra/mapreduce/ src/ja...
Author: yanz
Date: Sat Feb 13 00:06:15 2010
New Revision: 909667
URL: http://svn.apache.org/viewvc?rev=909667&view=rev
Log:
PIG-1140 Support of Hadoop 2.0 API (xuefuz via yanz)
Added:
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableExpr.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableOutputFormat.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/CachedTableScanner.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/CompositeTableExpr.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/NullScanner.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExpr.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExprUtils.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableUnionExpr.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/ZebraOutputPartition.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/ZebraProjection.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/ZebraSchema.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/ZebraSortInfo.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/ZebraStorageHint.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/package-info.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/ArticleGenerator.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/Dictionary.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TableMRSample.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TableMRSample1.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TableMRSample2.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TableMRSampleSortedTable.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TableMRSortedTableZebraKeyGenerator.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TableMapReduceExample.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestBasicTableIOFMoreLocal.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestBasicTableIOFormatDFS.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestBasicTableIOFormatLocalFS.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestCheckin.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestMultipleOutputs.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestMultipleOutputs2.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestMultipleOutputs2TypedApi.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestMultipleOutputs3.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestMultipleOutputs3TypedApi.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestMultipleOutputs4.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestMultipleOutputs4TypedApi.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestMultipleOutputsTypeApi.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestMultipleOutputsTypedApiNeg.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestTfileSplit.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestTypedApi.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestTypedApi2.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/ToolTestComparator.java
Modified:
hadoop/pig/branches/load-store-redesign/contrib/zebra/CHANGES.txt
hadoop/pig/branches/load-store-redesign/contrib/zebra/build.xml
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/IOutils.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/KeyDistribution.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/TableScanner.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableOutputFormat.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/package-info.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/package-info.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/SchemaConverter.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/Schema.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs2.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs2TypedApi.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs3.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs3TypedApi.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs4.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs4TypedApi.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputsTypeApi.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputsTypedApiNeg.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestTypedApi.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestTypedApi2.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMergeJoin.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMergeJoinPartial.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveProjection.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveProjectionNegative.java
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/CHANGES.txt?rev=909667&r1=909666&r2=909667&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/CHANGES.txt (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/CHANGES.txt Sat Feb 13 00:06:15 2010
@@ -12,6 +12,8 @@
IMPROVEMENTS
+ PIG-1140 Support of Hadoop 2.0 API (xuefuz via yanz)
+
PIG-1170 new end-to-end and stress test cases (jing1234 via yanz)
PIG-1136 Support of map split on hash keys with leading underscore (xuefuz via yanz)
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/build.xml
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/build.xml?rev=909667&r1=909666&r2=909667&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/build.xml (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/build.xml Sat Feb 13 00:06:15 2010
@@ -106,6 +106,8 @@
<echo message="contrib: ${name}"/>
<delete dir="${pig.log.dir}"/>
<mkdir dir="${pig.log.dir}"/>
+ <delete dir="${build.test}/data"/>
+ <mkdir dir="${build.test}/data"/>
<junit
printsummary="yes" showoutput="${test.output}"
haltonfailure="no" fork="yes" maxmemory="1024m"
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java?rev=909667&r1=909666&r2=909667&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java Sat Feb 13 00:06:15 2010
@@ -49,7 +49,6 @@
import org.apache.hadoop.zebra.tfile.MetaBlockAlreadyExists;
import org.apache.hadoop.zebra.tfile.MetaBlockDoesNotExist;
import org.apache.hadoop.zebra.tfile.Utils.Version;
-import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.zebra.io.ColumnGroup.Reader.CGRangeSplit;
import org.apache.hadoop.zebra.io.ColumnGroup.Reader.CGRowSplit;
import org.apache.hadoop.zebra.io.ColumnGroup.Reader.CGScanner;
@@ -90,8 +89,6 @@
new Version((short) 1, (short) 1);
// name of the BasicTable meta-data file
private final static String BT_META_FILE = ".btmeta";
- // column group prefix
- private final static String CGPathPrefix = "CG";
private final static String DELETED_CG_PREFIX = ".deleted-";
@@ -815,7 +812,8 @@
* A row-based split on the zebra table;
*/
public static class RowSplit implements Writable {
- int cgIndex; // column group index where split lies on;
+
+ int cgIndex; // column group index where split lies on;
CGRowSplit slice;
RowSplit(int cgidx, CGRowSplit split) {
@@ -1451,7 +1449,7 @@
}
}
}
-
+
/**
* Get the schema of the table.
*
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java?rev=909667&r1=909666&r2=909667&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java Sat Feb 13 00:06:15 2010
@@ -392,10 +392,10 @@
"Cannot get key-bounded scanner for unsorted table");
}
RawComparable begin =
- (beginKey != null) ? new ByteArray(beginKey.get(), 0, beginKey
- .getSize()) : null;
+ (beginKey != null) ? new ByteArray(beginKey.getBytes(), 0, beginKey
+ .getLength()) : null;
RawComparable end =
- (endKey != null) ? new ByteArray(endKey.get(), 0, endKey.getSize())
+ (endKey != null) ? new ByteArray(endKey.getBytes(), 0, endKey.getLength())
: null;
if (begin != null && end != null) {
if (comparator.compare(begin, end) >= 0) {
@@ -800,7 +800,8 @@
scanner = reader.createScannerByRecordNum(rowRange.startRow,
rowRange.startRow + rowRange.numRows);
} else {
- /* using deprecated API just so that zebra can work with
+ /* TODO: more investigation is needed for the following.
+ * using deprecated API just so that zebra can work with
* hadoop jar that does not contain HADOOP-6218 (Record ids for
* TFile). This is expected to be temporary. Later we should
* use the undeprecated API.
@@ -865,7 +866,7 @@
}
boolean seekTo(BytesWritable key) throws IOException {
- return scanner.seekTo(key.get(), 0, key.getSize());
+ return scanner.seekTo(key.getBytes(), 0, key.getLength());
}
boolean advance() throws IOException {
@@ -1093,7 +1094,7 @@
return false;
}
int index =
- cgindex.lowerBound(new ByteArray(key.get(), 0, key.getSize()),
+ cgindex.lowerBound(new ByteArray(key.getBytes(), 0, key.getLength()),
comparator);
if (index >= endIndex) {
seekToEnd();
@@ -1596,9 +1597,9 @@
@Override
public void insert(BytesWritable key, Tuple row) throws IOException {
TypesUtils.checkCompatible(row, getSchema());
- DataOutputStream outKey = tfileWriter.prepareAppendKey(key.getSize());
+ DataOutputStream outKey = tfileWriter.prepareAppendKey(key.getLength());
try {
- outKey.write(key.get(), 0, key.getSize());
+ outKey.write(key.getBytes(), 0, key.getLength());
}
finally {
outKey.close();
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/IOutils.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/IOutils.java?rev=909667&r1=909666&r2=909667&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/IOutils.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/IOutils.java Sat Feb 13 00:06:15 2010
@@ -17,8 +17,6 @@
package org.apache.hadoop.zebra.io;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
import java.io.PrintStream;
/**
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/KeyDistribution.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/KeyDistribution.java?rev=909667&r1=909666&r2=909667&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/KeyDistribution.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/KeyDistribution.java Sat Feb 13 00:06:15 2010
@@ -150,7 +150,7 @@
* Get the block distribution of all data that maps to the key bucket.
*/
public BlockDistribution getBlockDistribution(BytesWritable key) {
- ByteArray key0 = new ByteArray(key.get(), 0, key.getSize());
+ ByteArray key0 = new ByteArray(key.getBytes(), 0, key.getLength());
BlockDistribution bInfo = data.get(key0);
if (bInfo == null) {
throw new IllegalArgumentException("Invalid key");
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/TableScanner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/TableScanner.java?rev=909667&r1=909666&r2=909667&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/TableScanner.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/TableScanner.java Sat Feb 13 00:06:15 2010
@@ -22,7 +22,6 @@
import java.io.IOException;
import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.zebra.types.Projection;
import org.apache.hadoop.zebra.schema.Schema;
import org.apache.pig.data.Tuple;
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableOutputFormat.java?rev=909667&r1=909666&r2=909667&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableOutputFormat.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableOutputFormat.java Sat Feb 13 00:06:15 2010
@@ -140,7 +140,10 @@
*
* }
* </pre>
+ *
+ * @Deprecated Use (@link org.apache.hadoop.zebra.mapreduce.BasicTableOutputFormat) instead
*/
+@Deprecated
public class BasicTableOutputFormat implements
OutputFormat<BytesWritable, Tuple> {
private static final String OUTPUT_PATH = "mapred.lib.table.output.dir";
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java?rev=909667&r1=909666&r2=909667&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java Sat Feb 13 00:06:15 2010
@@ -136,7 +136,10 @@
* <LI>{@link DataBag} : A DataBag is a collection of Tuples.
* <LI>{@link Tuple} : Yes, Tuple itself can be a datum in another Tuple.
* </UL>
+ *
+ * @Deprecated Use (@link org.apache.hadoop.zebra.mapreduce.TableInputFormat) instead
*/
+@Deprecated
public class TableInputFormat implements InputFormat<BytesWritable, Tuple> {
static Log LOG = LogFactory.getLog(TableInputFormat.class);
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java?rev=909667&r1=909666&r2=909667&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java Sat Feb 13 00:06:15 2010
@@ -30,7 +30,10 @@
/**
* Adaptor class to implement RecordReader on top of Scanner.
+ *
+ * @Deprecated Use (@link org.apache.hadoop.zebra.mapreduce.TableRecordReader) instead
*/
+@Deprecated
public class TableRecordReader implements RecordReader<BytesWritable, Tuple> {
private final TableScanner scanner;
private long count = 0;
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/package-info.java?rev=909667&r1=909666&r2=909667&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/package-info.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/package-info.java Sat Feb 13 00:06:15 2010
@@ -19,6 +19,8 @@
* Providing {@link org.apache.hadoop.mapred.InputFormat} and
* {@link org.apache.hadoop.mapred.OutputFormat} adaptor classes for Hadoop
* Zebra Table.
+ *
+ * This package is deprecated. Please use org.apache.hadoop.zebra.mapreduce instead.
*/
package org.apache.hadoop.zebra.mapred;
Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableExpr.java?rev=909667&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableExpr.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableExpr.java Sat Feb 13 00:06:15 2010
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.mapreduce;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableScanner;
+import org.apache.hadoop.zebra.parser.ParseException;
+import org.apache.hadoop.zebra.schema.Schema;
+
+/**
+ * Table expression for reading a BasicTable.
+ *
+ * @see <a href="doc-files/examples/ReadABasicTable.java">Usage example for
+ * BasicTableExpr</a>
+ */
+class BasicTableExpr extends TableExpr {
+ private Path path;
+
+ /**
+ * default constructor.
+ */
+ public BasicTableExpr() {
+ // no-op
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param path
+ * Path of the BasicTable.
+ */
+ public BasicTableExpr(Path path) {
+ this.path = path;
+ }
+
+ /**
+ * Set the path.
+ *
+ * @param path
+ * path to the BasicTable.
+ * @return self.
+ */
+ public BasicTableExpr setPath(Path path) {
+ this.path = path;
+ return this;
+ }
+
+ /**
+ * Get the path.
+ *
+ * @return the path to the BasicTable.
+ */
+ public Path getPath() {
+ return path;
+ }
+
+ @Override
+ protected BasicTableExpr decodeParam(StringReader in) throws IOException {
+ String strPath = TableExprUtils.decodeString(in);
+ if (strPath == null) {
+ throw new RuntimeException("Incomplete expression");
+ }
+ path = new Path(strPath);
+ return this;
+ }
+
+ @Override
+ protected BasicTableExpr encodeParam(StringBuilder out) {
+ if (path == null) {
+ throw new RuntimeException("Incomplete expression");
+ }
+ TableExprUtils.encodeString(out, path.toString());
+ return this;
+ }
+
+ @Override
+ public List<LeafTableInfo> getLeafTables(String projection) {
+ ArrayList<LeafTableInfo> ret = new ArrayList<LeafTableInfo>(1);
+ ret.add(new LeafTableInfo(path, projection));
+ return ret;
+ }
+
+ @Override
+ public TableScanner getScanner(BytesWritable begin, BytesWritable end,
+ String projection, Configuration conf) throws IOException {
+ BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+ try {
+ reader.setProjection(projection);
+ } catch (ParseException e) {
+ throw new IOException("Projection parsing failed : "+e.getMessage());
+ }
+ return reader.getScanner(begin, end, true);
+ }
+
+ @Override
+ public Schema getSchema(Configuration conf) throws IOException {
+ return BasicTable.Reader.getSchema(path, conf);
+ }
+
+ @Override
+ public boolean sortedSplitCapable() {
+ return true;
+ }
+
+ @Override
+ protected void dumpInfo(PrintStream ps, Configuration conf, int indent) throws IOException
+ {
+ BasicTable.dumpInfo(path.toString(), ps, conf, indent);
+ }
+}
Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableOutputFormat.java?rev=909667&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableOutputFormat.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableOutputFormat.java Sat Feb 13 00:06:15 2010
@@ -0,0 +1,775 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.parser.ParseException;
+import org.apache.hadoop.zebra.types.Partition;
+import org.apache.hadoop.zebra.types.SortInfo;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.tfile.TFile;
+import org.apache.pig.data.Tuple;
+import org.apache.hadoop.zebra.pig.comparator.*;
+
+
+/**
+ * {@link org.apache.hadoop.mapreduce.OutputFormat} class for creating a
+ * BasicTable.
+ *
+ * Usage Example:
+ * <p>
+ * In the main program, add the following code.
+ *
+ * <pre>
+ * job.setOutputFormatClass(BasicTableOutputFormat.class);
+ * Path outPath = new Path("path/to/the/BasicTable");
+ * BasicTableOutputFormat.setOutputPath(job, outPath);
+ * BasicTableOutputFormat.setSchema(job, "Name, Age, Salary, BonusPct");
+ * </pre>
+ *
+ * The above code does the following things:
+ * <UL>
+ * <LI>Set the output format class to BasicTableOutputFormat.
+ * <LI>Set the single path to the BasicTable to be created.
+ * <LI>Set the schema of the BasicTable to be created. In this case, the
+ * to-be-created BasicTable contains three columns with names "Name", "Age",
+ * "Salary", "BonusPct".
+ * </UL>
+ *
+ * To create multiple output paths. ZebraOutputPartitoner interface needs to be implemented
+ * <pre>
+ * String multiLocs = "commaSeparatedPaths"
+ * job.setOutputFormatClass(BasicTableOutputFormat.class);
+ * BasicTableOutputFormat.setMultipleOutputPaths(job, multiLocs);
+ * job.setOutputFormat(BasicTableOutputFormat.class);
+ * BasicTableOutputFormat.setSchema(job, "Name, Age, Salary, BonusPct");
+ * BasicTableOutputFormat.setZebraOutputPartitionClass(
+ * job, MultipleOutputsTest.OutputPartitionerClass.class);
+ * </pre>
+ *
+ *
+ * The user ZebraOutputPartitionClass should like this
+ *
+ * <pre>
+ *
+ * static class OutputPartitionerClass implements ZebraOutputPartition {
+ * @Override
+ * public int getOutputPartition(BytesWritable key, Tuple value) {
+ *
+ * return someIndexInOutputParitionlist0;
+ * }
+ *
+ * </pre>
+ *
+ *
+ * The user Reducer code (or similarly Mapper code if it is a Map-only job)
+ * should look like the following:
+ *
+ * <pre>
+ * static class MyReduceClass implements Reducer<K, V, BytesWritable, Tuple> {
+ * // keep the tuple object for reuse.
+ * Tuple outRow;
+ * // indices of various fields in the output Tuple.
+ * int idxName, idxAge, idxSalary, idxBonusPct;
+ *
+ * @Override
+ * public void configure(Job job) {
+ * Schema outSchema = BasicTableOutputFormat.getSchema(job);
+ * // create a tuple that conforms to the output schema.
+ * outRow = TypesUtils.createTuple(outSchema);
+ * // determine the field indices.
+ * idxName = outSchema.getColumnIndex("Name");
+ * idxAge = outSchema.getColumnIndex("Age");
+ * idxSalary = outSchema.getColumnIndex("Salary");
+ * idxBonusPct = outSchema.getColumnIndex("BonusPct");
+ * }
+ *
+ * @Override
+ * public void reduce(K key, Iterator<V> values,
+ * OutputCollector<BytesWritable, Tuple> output, Reporter reporter)
+ * throws IOException {
+ * String name;
+ * int age;
+ * int salary;
+ * double bonusPct;
+ * // ... Determine the value of the individual fields of the row to be inserted.
+ * try {
+ * outTuple.set(idxName, name);
+ * outTuple.set(idxAge, new Integer(age));
+ * outTuple.set(idxSalary, new Integer(salary));
+ * outTuple.set(idxBonusPct, new Double(bonusPct));
+ * output.collect(new BytesWritable(name.getBytes()), outTuple);
+ * }
+ * catch (ExecException e) {
+ * // should never happen
+ * }
+ * }
+ *
+ * @Override
+ * public void close() throws IOException {
+ * // no-op
+ * }
+ *
+ * }
+ * </pre>
+ */
+public class BasicTableOutputFormat extends OutputFormat<BytesWritable, Tuple> {
+ private static final String OUTPUT_PATH = "mapreduce.lib.table.output.dir";
+ public static final String MULTI_OUTPUT_PATH = "mapreduce.lib.table.multi.output.dirs";
+ private static final String OUTPUT_SCHEMA = "mapreduce.lib.table.output.schema";
+ private static final String OUTPUT_STORAGEHINT = "mapreduce.lib.table.output.storagehint";
+ private static final String OUTPUT_SORTCOLUMNS = "mapreduce.lib.table.output.sortcolumns";
+ private static final String OUTPUT_COMPARATOR = "mapreduce.lib.table.output.comparator";
+ static final String IS_MULTI = "multi";
+ public static final String ZEBRA_OUTPUT_PARTITIONER_CLASS = "zebra.output.partitioner.class";
+
+
+ /**
+ * Set the multiple output paths of the BasicTable in JobContext
+ *
+ * @param jobContext
+ * The JobContext object.
+ * @param commaSeparatedLocations
+ * The comma separated output paths to the tables.
+ * The path must either not existent, or must be an empty directory.
+ * @param theClass
+ * Zebra output partitoner class
+ */
+
+ public static void setMultipleOutputs(JobContext jobContext, String commaSeparatedLocations, Class<? extends ZebraOutputPartition> theClass)
+ throws IOException {
+ Configuration conf = jobContext.getConfiguration();
+ conf.set(MULTI_OUTPUT_PATH, commaSeparatedLocations);
+
+ if(conf.getBoolean(IS_MULTI, true) == false) {
+ throw new IllegalArgumentException("Job has been setup as single output path");
+ }
+ conf.setBoolean(IS_MULTI, true);
+ setZebraOutputPartitionClass(jobContext, theClass);
+ }
+
+ /**
+ * Set the multiple output paths of the BasicTable in JobContext
+ *
+ * @param jobContext
+ * The JobContext object.
+ * @param paths
+ * The list of paths
+ * The path must either not existent, or must be an empty directory.
+ * @param theClass
+ * Zebra output partitioner class
+ */
+
+ public static void setMultipleOutputs(JobContext jobContext, Class<? extends ZebraOutputPartition> theClass, Path... paths)
+ throws IOException {
+ Configuration conf = jobContext.getConfiguration();
+ FileSystem fs = FileSystem.get( conf );
+ Path path = paths[0].makeQualified(fs);
+ StringBuffer str = new StringBuffer(StringUtils.escapeString(path.toString()));
+ for(int i = 1; i < paths.length;i++) {
+ str.append(StringUtils.COMMA_STR);
+ path = paths[i].makeQualified(fs);
+ str.append(StringUtils.escapeString(path.toString()));
+ }
+ conf.set(MULTI_OUTPUT_PATH, str.toString());
+
+ if(conf.getBoolean(IS_MULTI, true) == false) {
+ throw new IllegalArgumentException("Job has been setup as single output path");
+ }
+ conf.setBoolean(IS_MULTI, true);
+ setZebraOutputPartitionClass(jobContext, theClass);
+
+ }
+
+
+ /**
+ * Set the multiple output paths of the BasicTable in JobContext
+ *
+ * @param jobContext
+ * The JobContext object.
+ * @return path
+ * The comma separated output paths to the tables.
+ * The path must either not existent, or must be an empty directory.
+ */
+
+ public static Path[] getOutputPaths(JobContext jobContext)
+ throws IOException {
+ Configuration conf = jobContext.getConfiguration();
+
+ Path[] result;
+ String paths = conf.get(MULTI_OUTPUT_PATH);
+ String path = conf.get(OUTPUT_PATH);
+
+ if(paths != null && path != null) {
+ throw new IllegalArgumentException("Illegal output paths specs. Both multi and single output locs are set");
+ }
+
+ if(conf.getBoolean(IS_MULTI, false) == true) {
+ if (paths == null || paths.equals("")) {
+ throw new IllegalArgumentException("Illegal multi output paths");
+ }
+ String [] list = StringUtils.split(paths);
+ result = new Path[list.length];
+ for (int i = 0; i < list.length; i++) {
+ result[i] = new Path(StringUtils.unEscapeString(list[i]));
+ }
+ } else {
+ if (path == null || path.equals("")) {
+ throw new IllegalArgumentException("Cannot find output path");
+ }
+ result = new Path[1];
+ result[0] = new Path(path);
+ }
+
+ return result;
+
+ }
+
+
+ private static void setZebraOutputPartitionClass(
+ JobContext jobContext, Class<? extends ZebraOutputPartition> theClass) throws IOException {
+ if (!ZebraOutputPartition.class.isAssignableFrom(theClass))
+ throw new IOException(theClass+" not "+ZebraOutputPartition.class.getName());
+ jobContext.getConfiguration().set(ZEBRA_OUTPUT_PARTITIONER_CLASS, theClass.getName());
+ }
+
+
+ public static Class<? extends ZebraOutputPartition> getZebraOutputPartitionClass(JobContext jobContext) throws IOException {
+ Configuration conf = jobContext.getConfiguration();
+
+ Class<?> theClass;
+ String valueString = conf.get(ZEBRA_OUTPUT_PARTITIONER_CLASS);
+ if (valueString == null)
+ throw new IOException("zebra output partitioner class not found");
+ try {
+ theClass = conf.getClassByName(valueString);
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ }
+
+ if (theClass != null && !ZebraOutputPartition.class.isAssignableFrom(theClass))
+ throw new IOException(theClass+" not "+ZebraOutputPartition.class.getName());
+ else if (theClass != null)
+ return theClass.asSubclass(ZebraOutputPartition.class);
+ else
+ return null;
+
+ }
+
+
+
+ /**
+ * Set the output path of the BasicTable in JobContext
+ *
+ * @param jobContext
+ * The JobContext object.
+ * @param path
+ * The output path to the table. The path must either not existent,
+ * or must be an empty directory.
+ */
+ public static void setOutputPath(JobContext jobContext, Path path) {
+ Configuration conf = jobContext.getConfiguration();
+ conf.set(OUTPUT_PATH, path.toString());
+ if(conf.getBoolean(IS_MULTI, false) == true) {
+ throw new IllegalArgumentException("Job has been setup as multi output paths");
+ }
+ conf.setBoolean(IS_MULTI, false);
+
+ }
+
+ /**
+ * Get the output path of the BasicTable from JobContext
+ *
+ * @param jobContext
+ * jobContext object
+ * @return The output path.
+ */
+ public static Path getOutputPath(JobContext jobContext) {
+ Configuration conf = jobContext.getConfiguration();
+ String path = conf.get(OUTPUT_PATH);
+ return (path == null) ? null : new Path(path);
+ }
+
+ /**
+ * Set the table schema in JobContext
+ *
+ * @param jobContext
+ * The JobContext object.
+ * @param schema
+ * The schema of the BasicTable to be created. For the initial
+ * implementation, the schema string is simply a comma separated list
+ * of column names, such as "Col1, Col2, Col3".
+ *
+ * @deprecated Use {@link #setStorageInfo(JobContext, ZebraSchema, ZebraStorageHint, ZebraSortInfo)} instead.
+ */
+ public static void setSchema(JobContext jobContext, String schema) {
+ Configuration conf = jobContext.getConfiguration();
+ conf.set(OUTPUT_SCHEMA, Schema.normalize(schema));
+ }
+
+ /**
+ * Get the table schema in JobContext.
+ *
+ * @param jobContext
+ * The JobContext object.
+ * @return The output schema of the BasicTable. If the schema is not defined
+ * in the jobContext object at the time of the call, null will be returned.
+ */
+ public static Schema getSchema(JobContext jobContext) throws ParseException {
+ Configuration conf = jobContext.getConfiguration();
+ String schema = conf.get(OUTPUT_SCHEMA);
+ if (schema == null) {
+ return null;
+ }
+ //schema = schema.replaceAll(";", ",");
+ return new Schema(schema);
+ }
+
+ private static KeyGenerator makeKeyBuilder(byte[] elems) {
+ ComparatorExpr[] exprs = new ComparatorExpr[elems.length];
+ for (int i = 0; i < elems.length; ++i) {
+ exprs[i] = ExprUtils.primitiveComparator(i, elems[i]);
+ }
+ return new KeyGenerator(ExprUtils.tupleComparator(exprs));
+ }
+
+ /**
+ * Generates a zebra specific sort key generator which is used to generate BytesWritable key
+ * Sort Key(s) are used to generate this object
+ *
+ * @param jobContext
+ * The JobContext object.
+ * @return Object of type zebra.pig.comaprator.KeyGenerator.
+ *
+ */
+ public static Object getSortKeyGenerator(JobContext jobContext) throws IOException, ParseException {
+ SortInfo sortInfo = getSortInfo( jobContext );
+ Schema schema = getSchema(jobContext);
+ String[] sortColNames = sortInfo.getSortColumnNames();
+
+ byte[] types = new byte[sortColNames.length];
+ for(int i =0 ; i < sortColNames.length; ++i){
+ types[i] = schema.getColumn(sortColNames[i]).getType().pigDataType();
+ }
+ KeyGenerator builder = makeKeyBuilder(types);
+ return builder;
+
+ }
+
+
+ /**
+ * Generates a BytesWritable key for the input key
+ * using keygenerate provided. Sort Key(s) are used to generate this object
+ *
+ * @param builder
+ * Opaque key generator created by getSortKeyGenerator() method
+ * @param t
+ * Tuple to create sort key from
+ * @return ByteWritable Key
+ *
+ */
+ public static BytesWritable getSortKey(Object builder, Tuple t) throws Exception {
+ KeyGenerator kg = (KeyGenerator) builder;
+ return kg.generateKey(t);
+ }
+
+
+
+
+ /**
+ * Set the table storage hint in JobContext, should be called after setSchema is
+ * called.
+ * <br> <br>
+ *
+ * Note that the "secure by" feature is experimental now and subject to
+ * changes in the future.
+ *
+ * @param jobContext
+ * The JobContext object.
+ * @param storehint
+ * The storage hint of the BasicTable to be created. The format would
+ * be like "[f1, f2.subfld]; [f3, f4]".
+ *
+ * @deprecated Use {@link #setStorageInfo(JobContext, ZebraSchema, ZebraStorageHint, ZebraSortInfo)} instead.
+ */
+ public static void setStorageHint(JobContext jobContext, String storehint) throws ParseException, IOException {
+ Configuration conf = jobContext.getConfiguration();
+ String schema = conf.get(OUTPUT_SCHEMA);
+
+ if (schema == null)
+ throw new ParseException("Schema has not been set");
+
+ // for sanity check purpose only
+ new Partition(schema, storehint, null);
+
+ conf.set(OUTPUT_STORAGEHINT, storehint);
+ }
+
+ /**
+ * Get the table storage hint in JobContext.
+ *
+ * @param jobContext
+ * The JobContext object.
+ * @return The storage hint of the BasicTable. If the storage hint is not
+ * defined in the jobContext object at the time of the call, an empty string
+ * will be returned.
+ */
+ public static String getStorageHint(JobContext jobContext) throws ParseException {
+ Configuration conf = jobContext.getConfiguration();
+ String storehint = conf.get(OUTPUT_STORAGEHINT);
+ return storehint == null ? "" : storehint;
+ }
+
+ /**
+ * Set the sort info
+ *
+ * @param jobContext
+ * The JobContext object.
+ *
+ * @param sortColumns
+ * Comma-separated sort column names
+ *
+ * @param comparatorClass
+ * comparator class name; null for default
+ *
+ * @deprecated Use {@link #setStorageInfo(JobContext, ZebraSchema, ZebraStorageHint, ZebraSortInfo)} instead.
+ */
+ public static void setSortInfo(JobContext jobContext, String sortColumns, Class<? extends RawComparator<Object>> comparatorClass) {
+ Configuration conf = jobContext.getConfiguration();
+ conf.set(OUTPUT_SORTCOLUMNS, sortColumns);
+ if (comparatorClass != null)
+ conf.set(OUTPUT_COMPARATOR, TFile.COMPARATOR_JCLASS+comparatorClass.getName());
+ }
+
+ /**
+ * Set the sort info
+ *
+ * @param jobContext
+ * The JobContext object.
+ *
+ * @param sortColumns
+ * Comma-separated sort column names
+ *
+ * @deprecated Use {@link #setStorageInfo(JobContext, ZebraSchema, ZebraStorageHint, ZebraSortInfo)} instead.
+ */
+ public static void setSortInfo(JobContext jobContext, String sortColumns) {
+ jobContext.getConfiguration().set(OUTPUT_SORTCOLUMNS, sortColumns);
+ }
+
+ /**
+ * Set the table storage info including ZebraSchema,
+ *
+ * @param jobcontext
+ * The JobContext object.
+ *
+ * @param zSchema The ZebraSchema object containing schema information.
+ *
+ * @param zStorageHint The ZebraStorageHint object containing storage hint information.
+ *
+ * @param zSortInfo The ZebraSortInfo object containing sorting information.
+ *
+ */
+ public static void setStorageInfo(JobContext jobContext, ZebraSchema zSchema, ZebraStorageHint zStorageHint, ZebraSortInfo zSortInfo)
+ throws ParseException, IOException {
+ String schemaStr = null;
+ String storageHintStr = null;
+
+ /* validity check on schema*/
+ if (zSchema == null) {
+ throw new IllegalArgumentException("ZebraSchema object cannot be null.");
+ } else {
+ schemaStr = zSchema.toString();
+ }
+
+ Schema schema = null;
+ try {
+ schema = new Schema(schemaStr);
+ } catch (ParseException e) {
+ throw new ParseException("[" + zSchema + "] " + " is not a valid schema string: " + e.getMessage());
+ }
+
+ /* validity check on storage hint*/
+ if (zStorageHint == null) {
+ storageHintStr = "";
+ } else {
+ storageHintStr = zStorageHint.toString();
+ }
+
+ try {
+ new Partition(schemaStr, storageHintStr, null);
+ } catch (ParseException e) {
+ throw new ParseException("[" + zStorageHint + "] " + " is not a valid storage hint string: " + e.getMessage() );
+ } catch (IOException e) {
+ throw new ParseException("[" + zStorageHint + "] " + " is not a valid storage hint string: " + e.getMessage() );
+ }
+
+ Configuration conf = jobContext.getConfiguration();
+ conf.set(OUTPUT_SCHEMA, schemaStr);
+ conf.set(OUTPUT_STORAGEHINT, storageHintStr);
+
+ /* validity check on sort info if user specifies it */
+ if (zSortInfo != null) {
+ String sortColumnsStr = zSortInfo.getSortColumns();
+ String comparatorStr = zSortInfo.getComparator();
+
+ /* Check existence of comparable class if user specifies it */
+ if (comparatorStr != null && comparatorStr != "") {
+ try {
+ conf.getClassByName(comparatorStr.substring(TFile.COMPARATOR_JCLASS.length()).trim());
+ } catch (ClassNotFoundException e) {
+ throw new IOException("comparator Class cannot be found : " + e.getMessage());
+ }
+ }
+
+ try {
+ SortInfo.parse(sortColumnsStr, schema, comparatorStr);
+ } catch (IOException e) {
+ throw new IOException("[" + sortColumnsStr + " + " + comparatorStr + "] "
+ + "is not a valid sort configuration: " + e.getMessage());
+ }
+
+ if (sortColumnsStr != null)
+ conf.set(OUTPUT_SORTCOLUMNS, sortColumnsStr);
+ if (comparatorStr != null)
+ conf.set(OUTPUT_COMPARATOR, comparatorStr);
+ }
+ }
+
+ /**
+ * Get the SortInfo object
+ *
+ * @param jobContext
+ * The JobContext object.
+ * @return SortInfo object; null if the Zebra table is unsorted
+ *
+ */
+ public static SortInfo getSortInfo(JobContext jobContext)throws IOException
+ {
+ Configuration conf = jobContext.getConfiguration();
+ String sortColumns = conf.get(OUTPUT_SORTCOLUMNS);
+ if (sortColumns == null)
+ return null;
+ Schema schema = null;
+ try {
+ schema = getSchema(jobContext);
+ } catch (ParseException e) {
+ throw new IOException("Schema parsing failure : "+e.getMessage());
+ }
+ if (schema == null)
+ throw new IOException("Schema not defined");
+ String comparator = getComparator(jobContext);
+ return SortInfo.parse(sortColumns, schema, comparator);
+ }
+
+ /**
+ * Get the comparator for sort columns
+ *
+ * @param jobContext
+ * The JobContext object.
+ * @return comparator String
+ *
+ */
+ private static String getComparator(JobContext jobContext)
+ {
+ return jobContext.getConfiguration().get(OUTPUT_COMPARATOR);
+ }
+
+ /**
+ * Get the output table as specified in JobContext. It is useful for applications
+ * to add more meta data after all rows have been added to the table.
+ *
+ * @param conf
+ * The JobContext object.
+ * @return The output BasicTable.Writer object.
+ * @throws IOException
+ */
+ private static BasicTable.Writer[] getOutput(JobContext jobContext) throws IOException {
+ Path[] paths = getOutputPaths(jobContext);
+ BasicTable.Writer[] writers = new BasicTable.Writer[paths.length];
+ for(int i = 0; i < paths.length; i++) {
+ writers[i] = new BasicTable.Writer(paths[i], jobContext.getConfiguration());
+ }
+
+ return writers;
+ }
+
+ /**
+ * Note: we perform the Initialization of the table here. So we expect this to
+ * be called before
+ * {@link BasicTableOutputFormat#getRecordWriter(FileSystem, JobContext, String, Progressable)}
+ *
+ * @see OutputFormat#checkOutputSpecs(JobContext)
+ */
+ @Override
+ public void checkOutputSpecs(JobContext jobContext)
+ throws IOException {
+ Configuration conf = jobContext.getConfiguration();
+ String schema = conf.get(OUTPUT_SCHEMA);
+ if (schema == null) {
+ throw new IllegalArgumentException("Cannot find output schema");
+ }
+ String storehint, sortColumns, comparator;
+ try {
+ storehint = getStorageHint(jobContext);
+ sortColumns = (getSortInfo(jobContext) == null ? null : SortInfo.toSortString(getSortInfo(jobContext).getSortColumnNames()));
+ comparator = getComparator( jobContext );
+ }
+ catch (ParseException e) {
+ throw new IOException(e);
+ }
+
+ Path [] paths = getOutputPaths(jobContext);
+
+ for(int i = 0; i < paths.length; ++i) {
+ BasicTable.Writer writer =
+ new BasicTable.Writer(paths[i], schema, storehint, sortColumns, comparator, conf);
+ writer.finish();
+ }
+ }
+
+ /**
+ * @see OutputFormat#getRecordWriter(TaskAttemptContext)
+ */
+ @Override
+ public RecordWriter<BytesWritable, Tuple> getRecordWriter(TaskAttemptContext taContext)
+ throws IOException {
+ String path = taContext.getConfiguration().get(OUTPUT_PATH);
+ return new TableRecordWriter(path, taContext);
+ }
+
+ /**
+ * Close the output BasicTable, No more rows can be added into the table. A
+ * BasicTable is not visible for reading until it is "closed".
+ *
+ * @param jobContext
+ * The JobContext object.
+ * @throws IOException
+ */
+ public static void close(JobContext jobContext) throws IOException {
+ BasicTable.Writer tables[] = getOutput(jobContext);
+ for(int i =0; i < tables.length; ++i) {
+ tables[i].close();
+ }
+ }
+
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext taContext)
+ throws IOException, InterruptedException {
+ return new TableOutputCommitter( taContext ) ;
+ }
+}
+
+class TableOutputCommitter extends OutputCommitter {
+ public TableOutputCommitter(TaskAttemptContext taContext) {
+
+ }
+
+ @Override
+ public void abortTask(TaskAttemptContext taContext) throws IOException {
+ }
+
+ @Override
+ public void cleanupJob(JobContext jobContext) throws IOException {
+ }
+
+ @Override
+ public void commitTask(TaskAttemptContext taContext) throws IOException {
+ }
+
+ @Override
+ public boolean needsTaskCommit(TaskAttemptContext taContext)
+ throws IOException {
+ return false;
+ }
+
+ @Override
+ public void setupJob(JobContext jobContext) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void setupTask(TaskAttemptContext taContext) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+}
+
+/**
+ * Adaptor class for BasicTable RecordWriter.
+ */
+class TableRecordWriter extends RecordWriter<BytesWritable, Tuple> {
+ private final TableInserter inserter[];
+ private org.apache.hadoop.zebra.mapreduce.ZebraOutputPartition op = null;
+
+
+ public TableRecordWriter(String path, TaskAttemptContext context) throws IOException {
+ Configuration conf = context.getConfiguration();
+ if(conf.getBoolean(BasicTableOutputFormat.IS_MULTI, false) == true) {
+ op = (org.apache.hadoop.zebra.mapreduce.ZebraOutputPartition)
+ ReflectionUtils.newInstance(BasicTableOutputFormat.getZebraOutputPartitionClass(context), conf);
+
+ }
+ Path [] paths = BasicTableOutputFormat.getOutputPaths(context);
+ inserter = new TableInserter[paths.length];
+ String inserterName = "part-" + context.getTaskAttemptID().getTaskID().getId();
+ for(int i = 0; i < paths.length; ++i) {
+ BasicTable.Writer writer =
+ new BasicTable.Writer(paths[i], conf);
+ this.inserter[i] = writer.getInserter( inserterName, true);
+ }
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) throws IOException {
+ for(int i = 0; i < this.inserter.length; ++i) {
+ inserter[i].close();
+ }
+ }
+
+ @Override
+ public void write(BytesWritable key, Tuple value) throws IOException {
+ if(op != null ) {
+ int idx = op.getOutputPartition(key, value);
+ if(idx < 0 || (idx >= inserter.length)) {
+ throw new IllegalArgumentException("index returned by getOutputPartition is out of range");
+ }
+ inserter[idx].insert(key, value);
+ } else {
+ inserter[0].insert(key, value);
+ }
+ }
+
+}
+
Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/CachedTableScanner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/CachedTableScanner.java?rev=909667&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/CachedTableScanner.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/CachedTableScanner.java Sat Feb 13 00:06:15 2010
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.mapreduce;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.TableScanner;
+import org.apache.hadoop.zebra.types.Projection;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.data.Tuple;
+
+/**
+ * Utility class for those interested in writing their own TableExpr classes.
+ * This class encapsulates a regular table scanner and changes the way how keys
+ * and rows are accessed (with an internal cache).
+ */
+final class CachedTableScanner implements Closeable {
+ private BytesWritable key;
+ private Tuple row;
+ private boolean keyReady;
+ private boolean rowReady;
+ private int index;
+ private TableScanner scanner;
+
+ /**
+ * Constructor
+ *
+ * @param scanner
+ * The scanner to be encapsulated
+ * @throws IOException
+ */
+ public CachedTableScanner(TableScanner scanner, int index) throws IOException {
+ key = new BytesWritable();
+ row = TypesUtils.createTuple(Projection.getNumColumns(scanner.getProjection()));
+ keyReady = false;
+ rowReady = false;
+ this.index = index;
+ this.scanner = scanner;
+ }
+
+ /**
+ * Get the key at cursor.
+ *
+ * @return The key at cursor
+ * @throws IOException
+ */
+ public BytesWritable getKey() throws IOException {
+ if (!keyReady) {
+ scanner.getKey(key);
+ keyReady = true;
+ }
+ return key;
+ }
+
+ /**
+ * Get the value (Tuple) at cursor.
+ *
+ * @return the value at cursor.
+ * @throws IOException
+ */
+ public Tuple getValue() throws IOException {
+ if (!rowReady) {
+ scanner.getValue(row);
+ rowReady = true;
+ }
+ return row;
+ }
+
+ /**
+ * Get the table index in a union
+ *
+ * @return the table index in union
+ */
+ public int getIndex() {
+ return index;
+
+ }
+ /**
+ * Seek to a row whose key is greater than or equal to the input key.
+ *
+ * @param inKey
+ * the input key.
+ * @return true if an exact matching key is found. false otherwise.
+ * @throws IOException
+ */
+ public boolean seekTo(BytesWritable inKey) throws IOException {
+ boolean ret = scanner.seekTo(inKey);
+ reset();
+ return ret;
+ }
+
+ /**
+ * Advance the cursor.
+ *
+ * @return whether the cursor actually moves.
+ * @throws IOException
+ */
+ public boolean advance() throws IOException {
+ boolean ret = scanner.advance();
+ reset();
+ return ret;
+ }
+
+ private void reset() throws IOException {
+ row = TypesUtils.createTuple(Projection.getNumColumns(scanner.getProjection()));
+ keyReady = false;
+ rowReady = false;
+ }
+
+ /**
+ * Is cursor at end?
+ *
+ * @return Whether cursor is at the end.
+ * @throws IOException
+ */
+ public boolean atEnd() throws IOException {
+ return scanner.atEnd();
+ }
+
+ /**
+ * Get the schema of the tuples returned from this scanner.
+ *
+ * @return The schema of the tuples returned from this scanner.
+ */
+ public String getProjection() {
+ return scanner.getProjection();
+ }
+
+ /**
+ * Get the projected schema
+ * @return The projected schema
+ */
+ public Schema getSchema() {
+ return scanner.getSchema();
+ }
+
+ /**
+ * Seek to the end of the scanner.
+ *
+ * @throws IOException
+ */
+ public void seekToEnd() throws IOException {
+ reset();
+ scanner.seekToEnd();
+ }
+
+ /**
+ * Close the scanner, release all resources.
+ */
+ @Override
+ public void close() throws IOException {
+ scanner.close();
+ }
+}
Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/CompositeTableExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/CompositeTableExpr.java?rev=909667&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/CompositeTableExpr.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/CompositeTableExpr.java Sat Feb 13 00:06:15 2010
@@ -0,0 +1,304 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.mapreduce;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.zebra.parser.ParseException;
+import org.apache.hadoop.zebra.schema.Schema;
+
+
+/**
+ * An intermediate class to build concrete composite table expression classes.
+ * Lengthy names are chosen to avoid naming conflicts.
+ */
+abstract class CompositeTableExpr extends TableExpr {
+ public final static int INDENT_UNIT = 2;
+ protected ArrayList<TableExpr> composite;
+
+ protected CompositeTableExpr() {
+ composite = new ArrayList<TableExpr>();
+ }
+
+ protected CompositeTableExpr(int n) {
+ composite = new ArrayList<TableExpr>(n);
+ for (int i = 0; i < n; ++i) {
+ composite.add(null);
+ }
+ }
+
+ protected CompositeTableExpr addCompositeTable(TableExpr expr) {
+ composite.add(expr);
+ return this;
+ }
+
+ protected CompositeTableExpr addCompositeTables(TableExpr[] exprs) {
+ composite.ensureCapacity(composite.size() + exprs.length);
+ for (TableExpr expr : exprs) {
+ composite.add(expr);
+ }
+ return this;
+ }
+
+ protected CompositeTableExpr addCompositeTables(
+ Collection<? extends TableExpr> exprs) {
+ composite.addAll(exprs);
+ return this;
+ }
+
+ protected CompositeTableExpr setCompositeTable(int i, TableExpr expr) {
+ composite.set(i, expr);
+ return this;
+ }
+
+ protected List<TableExpr> getCompositeTables() {
+ return composite;
+ }
+
+ protected TableExpr getCompositeTable(int i) {
+ TableExpr expr = composite.get(i);
+ if (expr == null) {
+ throw new RuntimeException("Incomplete Nary expression");
+ }
+
+ return expr;
+ }
+
+ @Override
+ protected CompositeTableExpr decodeParam(StringReader in) throws IOException {
+ composite.clear();
+ int n = TableExprUtils.decodeInt(in);
+ composite.ensureCapacity(n);
+ for (int i = 0; i < n; ++i) {
+ TableExpr expr = TableExpr.parse(in);
+ composite.add(expr);
+ }
+
+ return this;
+ }
+
+ @Override
+ protected CompositeTableExpr encodeParam(StringBuilder out) {
+ int n = composite.size();
+ TableExprUtils.encodeInt(out, n);
+ for (int i = 0; i < composite.size(); ++i) {
+ getCompositeTable(i).encode(out);
+ }
+
+ return this;
+ }
+
+ @Override
+ public List<LeafTableInfo> getLeafTables(String projection) {
+ ArrayList<LeafTableInfo> ret = new ArrayList<LeafTableInfo>();
+
+ int n = composite.size();
+ for (int i = 0; i < n; ++i) {
+ ret.addAll(getCompositeTable(i).getLeafTables(projection));
+ }
+
+ return ret;
+ }
+
+ protected static class RowMappingEntry {
+ final int rowIndex;
+ final int fieldIndex;
+
+ public RowMappingEntry(int ri, int fi) {
+ rowIndex = ri;
+ fieldIndex = fi;
+ }
+
+ public int getRowIndex() {
+ return rowIndex;
+ }
+
+ public int getFieldIndex() {
+ return fieldIndex;
+ }
+ }
+
+ protected static class InferredProjection {
+ /**
+ * projections on each table in the composite. It should be of the same
+ * dimension as composite.
+ */
+ final Schema[] subProjections;
+
+ /**
+ * The actual (adjusted) input projection.
+ */
+ final Schema projection;
+
+ /**
+ * For each column in the input projection, what is the corresponding sub
+ * table (row index) and the corresponding projected column (field index).
+ */
+ final RowMappingEntry[] colMapping;
+
+ InferredProjection(List<String>[] subProj, String[] proj,
+ Map<String, RowMappingEntry> colMap) throws ParseException {
+ subProjections = new Schema[subProj.length];
+ for (int i = 0; i < subProj.length; ++i) {
+ List<String> subProjection = subProj[i];
+ subProjections[i] =
+ new Schema(subProjection.toArray(new String[subProjection.size()]));
+ }
+
+ projection = new Schema(proj);
+
+ this.colMapping = new RowMappingEntry[proj.length];
+
+ for (int i = 0; i < proj.length; ++i) {
+ colMapping[i] = colMap.get(proj[i]);
+ }
+ }
+
+ public Schema[] getSubProjections() {
+ return subProjections;
+ }
+
+ public RowMappingEntry[] getColMapping() {
+ return colMapping;
+ }
+
+ public Schema getProjection() {
+ return projection;
+ }
+ }
+
+ /**
+ * Given the input projection, infer the projections that should be applied on
+ * individual table in the composite.
+ *
+ * @param projection
+ * The expected projection
+ * @return The inferred projection with other information.
+ * @throws IOException
+ */
+ @SuppressWarnings("unchecked")
+ protected InferredProjection inferProjection(String projection,
+ Configuration conf)
+ throws IOException, ParseException {
+ int n = composite.size();
+ ArrayList<String>[] subProjections = new ArrayList[n];
+ for (int i = 0; i < n; ++i) {
+ subProjections[i] = new ArrayList<String>();
+ }
+
+ String[] columns;
+ LinkedHashMap<String, Integer> colNameMap =
+ new LinkedHashMap<String, Integer>();
+ TreeMap<String, RowMappingEntry> colMapping =
+ new TreeMap<String, RowMappingEntry>();
+
+ /**
+ * Create a collection of all column names and the corresponding index to
+ * the composite.
+ */
+ for (int i = 0; i < n; ++i) {
+ String[] cols = getCompositeTable(i).getSchema(conf).getColumns();
+ for (int j = 0; j < cols.length; ++j) {
+ if (colNameMap.get(cols[j]) != null) {
+ colNameMap.put(cols[j], -1); // special marking on duplicated names.
+ }
+ else {
+ colNameMap.put(cols[j], i);
+ }
+ }
+ }
+
+ if (projection == null) {
+ Set<String> keySet = colNameMap.keySet();
+ columns = keySet.toArray(new String[keySet.size()]);
+ }
+ else {
+ columns = projection.trim().split(Schema.COLUMN_DELIMITER);
+ }
+
+ for (int i = 0; i < columns.length; ++i) {
+ String col = columns[i];
+
+ if (colMapping.get(col) != null) {
+ throw new IllegalArgumentException(
+ "Duplicate column names in projection");
+ }
+
+ Integer rowIndex = colNameMap.get(col);
+ if (rowIndex == null) {
+ colMapping.put(col, null);
+ }
+ else {
+ if (rowIndex < 0) {
+ // The column name appears in more than one table in composite.
+ throw new RuntimeException("Ambiguous column in projection: "
+ + col);
+ }
+ subProjections[rowIndex].add(columns[i]);
+ colMapping.put(col, new RowMappingEntry(rowIndex,
+ subProjections[rowIndex].size() - 1));
+ }
+ }
+
+ return new InferredProjection(subProjections, columns, colMapping);
+ }
+
+ @Override
+ public Schema getSchema(Configuration conf) throws IOException {
+ Schema result = new Schema();
+ for (Iterator<TableExpr> it = composite.iterator(); it.hasNext();) {
+ TableExpr e = it.next();
+ try {
+ result.unionSchema(e.getSchema(conf));
+ } catch (ParseException exc) {
+ throw new IOException("Schema parsing failed :"+exc.getMessage());
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public boolean sortedSplitCapable() {
+ for (Iterator<TableExpr> it = composite.iterator(); it.hasNext();) {
+ TableExpr e = it.next();
+ if (!e.sortedSplitCapable()) return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ protected void dumpInfo(PrintStream ps, Configuration conf, int indent) throws IOException
+ {
+ for (Iterator<TableExpr> it = composite.iterator(); it.hasNext();) {
+ TableExpr e = it.next();
+ e.dumpInfo(ps, conf, indent+INDENT_UNIT);
+ }
+ }
+}
Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/NullScanner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/NullScanner.java?rev=909667&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/NullScanner.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/NullScanner.java Sat Feb 13 00:06:15 2010
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.mapreduce;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.TableScanner;
+import org.apache.hadoop.zebra.parser.ParseException;
+import org.apache.hadoop.zebra.types.Projection;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.pig.data.Tuple;
+
+/**
+ * A scanner that contains no rows.
+ */
+class NullScanner implements TableScanner {
+ String projection;
+
+ public NullScanner(String projection) {
+ this.projection = projection;
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ return false;
+ }
+
+ @Override
+ public boolean atEnd() throws IOException {
+ return true;
+ }
+
+ @Override
+ public String getProjection() {
+ return projection;
+ }
+
+ @Override
+ public Schema getSchema() {
+ Schema result;
+ try {
+ result = Projection.toSchema(projection);
+ } catch (ParseException e) {
+ throw new AssertionError("Invalid Projection: "+e.getMessage());
+ }
+ return result;
+ }
+
+ @Override
+ public void getKey(BytesWritable key) throws IOException {
+ throw new EOFException("No more rows to read");
+ }
+
+ @Override
+ public void getValue(Tuple row) throws IOException {
+ throw new EOFException("No more rows to read");
+ }
+
+ @Override
+ public boolean seekTo(BytesWritable key) throws IOException {
+ return false;
+ }
+
+ @Override
+ public void seekToEnd() throws IOException {
+ // Do nothing
+ }
+
+ @Override
+ public void close() throws IOException {
+ // Do nothing
+ }
+}
Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExpr.java?rev=909667&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExpr.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExpr.java Sat Feb 13 00:06:15 2010
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.mapreduce;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.io.PrintStream;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableScanner;
+import org.apache.hadoop.zebra.parser.ParseException;
+import org.apache.hadoop.zebra.schema.Schema;
+
+/**
+ * Table Expression - expression to describe an input table.
+ */
+abstract class TableExpr {
+ private boolean sorted = false;
+ /**
+ * Factory method to create a TableExpr from a string.
+ *
+ * @param in
+ * The string stream that is pointed at the beginning of the table
+ * expression string to be parsed.
+ * @return The instantiated TableExpr object. The string stream will move past
+ * the table expression string.
+ * @throws IOException
+ */
+ @SuppressWarnings("unchecked")
+ public static TableExpr parse(StringReader in) throws IOException {
+ String clsName = TableExprUtils.decodeString(in);
+ try {
+ Class<? extends TableExpr> tblExprCls =
+ (Class<? extends TableExpr>) Class.forName(clsName);
+ return tblExprCls.newInstance().decodeParam(in);
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Failed to load class: " + e.toString());
+ }
+ }
+
+ /**
+ * Encode the expression to a string stream.
+ *
+ * @param out
+ * The string stream that we should encode the expression into.
+ */
+ public final void encode(StringBuilder out) {
+ TableExprUtils.encodeString(out, getClass().getName());
+ encodeParam(out);
+ }
+
+ /**
+ * Decode the parameters of the expression from the input stream.
+ *
+ * @param in
+ * The string stream that is pointed at the beginning of the encoded
+ * parameters.
+ * @return Reference to itself, and the TableExpr that has its parameters set
+ * from the string stream. The string stream will move past the
+ * encoded parameters for this expression.
+ */
+ protected abstract TableExpr decodeParam(StringReader in) throws IOException;
+
+ /**
+ * Encode the parameters of the expression into the string stream.
+ *
+ * @param out
+ * The string stream that we write encoded parameters into.
+ * @return Reference to itself,
+ */
+ protected abstract TableExpr encodeParam(StringBuilder out);
+
+ /**
+ * Get a TableScanner from the TableExpr object. This method only needs to be
+ * implemented by table expressions that support sorted split.
+ *
+ * @param begin
+ * the Begin key (inclusive). Can be null, meaning starting from the
+ * first row of the table.
+ * @param end
+ * the End key (exclusive). Can be null, meaning scan to the last row
+ * of the table.
+ * @param projection
+ * The projection schema. It should never be null.
+ * @see Schema
+ * @return A TableScanner object.
+ */
+ public TableScanner getScanner(BytesWritable begin,
+ BytesWritable end, String projection, Configuration conf)
+ throws IOException {
+ return null;
+ }
+
+ /**
+ * Get a scanner with an unsorted split.
+ *
+ * @param split
+ * The range split.
+ * @param projection
+ * The projection schema. It should never be null.
+ * @param conf
+ * The configuration
+ * @return A table scanner.
+ * @throws IOException
+ */
+ public TableScanner getScanner(UnsortedTableSplit split, String projection,
+ Configuration conf) throws IOException, ParseException {
+ BasicTable.Reader reader =
+ new BasicTable.Reader(new Path(split.getPath()), conf);
+ reader.setProjection(projection);
+ return reader.getScanner(split.getSplit(), true);
+ }
+
+ /**
+ * Get a scanner with an unsorted split.
+ *
+ * @param split
+ * The range split.
+ * @param projection
+ * The projection schema. It should never be null.
+ * @param conf
+ * The configuration
+ * @return A table scanner.
+ * @throws IOException
+ */
+ public TableScanner getScanner(RowTableSplit split, String projection,
+ Configuration conf) throws IOException, ParseException, ParseException {
+ BasicTable.Reader reader =
+ new BasicTable.Reader(new Path(split.getPath()), conf);
+ reader.setProjection(projection);
+ return reader.getScanner(true, split.getSplit());
+ }
+
+ /**
+ * A leaf table corresponds to a materialized table. It is represented by the
+ * path to the BasicTable and the projection.
+ */
+ public static final class LeafTableInfo {
+ private final Path path;
+ private final String projection;
+
+ public LeafTableInfo(Path path, String projection) {
+ this.path = path;
+ this.projection = projection;
+ }
+
+ public Path getPath() {
+ return path;
+ }
+
+ public String getProjection() {
+ return projection;
+ }
+ }
+
+ /**
+ * Get the information of all leaf tables that will be accessed by this table
+ * expression.
+ *
+ * @param projection
+ * The projection that is applied to the table expression.
+ */
+ public abstract List<LeafTableInfo> getLeafTables(
+ String projection);
+
+ /**
+ * Get the schema of the table.
+ *
+ * @param conf
+ * The configuration object.
+ */
+ public abstract Schema getSchema(Configuration conf) throws IOException;
+
+ /**
+ * Does this expression requires sorted split? If yes, we require all
+ * underlying BasicTables to be sorted and we split by key sampling. If this
+ * method returns true, we expect sortedSplitCapable() also return true.
+ *
+ * @return Whether this expression may only be split by key.
+ */
+ public boolean sortedSplitRequired() {
+ return sorted;
+ }
+
+ /**
+ * Set the requirement for sorted table
+ */
+ public void setSortedSplit() {
+ sorted = true;
+ }
+
+ /**
+ * Is this expression capable of sorted split? If false, getScanner() should
+ * return null; otherwise, getScanner() should return a valid Scanner object.
+ *
+ * This function should be overridden by sub classes that is capable of sorted
+ * split. Note that this method should not perform any actual I/O operation,
+ * such as checking whether the leaf tables (BasicTables) is in fact sorted or
+ * not. When this method returns true, while at least one of the leaf tables
+ * is not sorted, an {@link IOException} will be thrown in split generation
+ * time.
+ *
+ * @return Whether the "table view" represented by the expression is sorted
+ * and is thus splittable by key (sorted split).
+ */
+ public boolean sortedSplitCapable() {
+ return false;
+ }
+
+ /**
+ * dump table info
+ */
+ public final void dumpInfo(PrintStream ps, Configuration conf) throws IOException
+ {
+ dumpInfo(ps, conf, 0);
+ }
+
+ /**
+ * dump table info with indent
+ */
+ protected abstract void dumpInfo(PrintStream ps, Configuration conf, int indent) throws IOException;
+}
Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExprUtils.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExprUtils.java?rev=909667&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExprUtils.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExprUtils.java Sat Feb 13 00:06:15 2010
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.mapreduce;
+
+import java.io.IOException;
+import java.io.StringReader;
+
+/**
+ * Utility methods for people interested in writing their own TableExpr classes.
+ */
+class TableExprUtils {
+ private TableExprUtils() {
+ // prevent instantiation
+ }
+
+ /**
+ * Encode a string into a StringBuilder.
+ *
+ * @param out
+ * output string builder.
+ * @param s
+ * String to be encoded
+ */
+ public static void encodeString(StringBuilder out, String s) {
+ if (s == null) {
+ encodeInt(out, null);
+ return;
+ }
+
+ encodeInt(out, s.length());
+ out.append(s);
+ }
+
+ /**
+ * Decode a string previously encoded through encodeString.
+ *
+ * @param in
+ * The input StringReader.
+ * @return The decoded string object.
+ * @throws IOException
+ */
+ public static String decodeString(StringReader in) throws IOException {
+ Integer len = decodeInt(in);
+ if (len == null) {
+ return null;
+ }
+
+ char[] chars = new char[len];
+ in.read(chars);
+ return new String(chars);
+ }
+
+ public static void encodeLong(StringBuilder out, Long l) {
+ if (l != null) {
+ out.append(l);
+ }
+ out.append(':');
+ }
+
+ public static Long decodeLong(StringReader in) throws IOException {
+ int c = in.read();
+ if (c == ':') {
+ return null;
+ }
+ if (Character.isDigit(c) == false) {
+ throw new IllegalArgumentException("Bad encoded string");
+ }
+ StringBuilder sb = new StringBuilder();
+ sb.append((char) c);
+
+ boolean colonSeen = false;
+ while ((c = in.read()) != -1) {
+ if (c == ':') {
+ colonSeen = true;
+ break;
+ }
+ if (!Character.isDigit(c)) {
+ break;
+ }
+ sb.append((char) c);
+ }
+ if (colonSeen == false) {
+ throw new IllegalArgumentException("Bad encoded string");
+ }
+ return new Long(sb.toString());
+ }
+
+ public static void encodeInt(StringBuilder out, Integer i) {
+ if (i == null) {
+ encodeLong(out, null);
+ }
+ else {
+ encodeLong(out, (long) i.intValue());
+ }
+ }
+
+ public static Integer decodeInt(StringReader in) throws IOException {
+ Long l = decodeLong(in);
+ if (l == null) return null;
+ return (int) l.longValue();
+ }
+}