You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ya...@apache.org on 2010/03/28 14:16:54 UTC
svn commit: r928385 [1/3] - in
/hadoop/pig/branches/branch-0.7/contrib/zebra: ./
src/java/org/apache/hadoop/zebra/mapreduce/
src/java/org/apache/hadoop/zebra/pig/ src/test/org/apache/hadoop/zebra/
src/test/org/apache/hadoop/zebra/mapred/ src/test/org/a...
Author: yanz
Date: Sun Mar 28 12:16:54 2010
New Revision: 928385
URL: http://svn.apache.org/viewvc?rev=928385&view=rev
Log:
PIG-1306 Support of locally sorted input splits (yanz)
Modified:
hadoop/pig/branches/branch-0.7/contrib/zebra/CHANGES.txt
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableExpr.java
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExpr.java
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableUnionExpr.java
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java
hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/BaseTestCase.java
hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/ToolTestComparator.java
hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/ToolTestComparator.java
hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveMultiTableGlob.java
hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveProjection.java
hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveSimple.java
hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveUnion.java
hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveVariableTable.java
hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSortedTableUnion.java
hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSortedTableUnionMergeJoin.java
hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSortedTableUnionSourceTableProj.java
Modified: hadoop/pig/branches/branch-0.7/contrib/zebra/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/CHANGES.txt?rev=928385&r1=928384&r2=928385&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/contrib/zebra/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.7/contrib/zebra/CHANGES.txt Sun Mar 28 12:16:54 2010
@@ -16,6 +16,8 @@ Trunk (unreleased changes)
IMPROVEMENTS
+ PIG-1306 Support of locally sorted input splits (yanz)
+
PIG-1268 Need an ant target that runs all pig-related tests in Zebra (xuefuz via yanz)
PIG-1207 Data sanity check should be performed at the end of writing instead of later at query time (yanz)
Modified: hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableExpr.java?rev=928385&r1=928384&r2=928385&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableExpr.java (original)
+++ hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableExpr.java Sun Mar 28 12:16:54 2010
@@ -29,6 +29,8 @@ import org.apache.hadoop.zebra.io.BasicT
import org.apache.hadoop.zebra.io.TableScanner;
import org.apache.hadoop.zebra.parser.ParseException;
import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.types.Projection;
+import org.apache.pig.data.Tuple;
/**
* Table expression for reading a BasicTable.
@@ -117,6 +119,12 @@ class BasicTableExpr extends TableExpr {
}
@Override
+ public TableScanner getScanner(RowTableSplit split, String projection,
+ Configuration conf) throws IOException, ParseException {
+ return new BasicTableScanner(split, projection, conf);
+ }
+
+ @Override
public Schema getSchema(Configuration conf) throws IOException {
return BasicTable.Reader.getSchema(path, conf);
}
@@ -131,4 +139,75 @@ class BasicTableExpr extends TableExpr {
{
BasicTable.dumpInfo(path.toString(), ps, conf, indent);
}
+
+ /**
+ * Basic Table Scanner
+ */
+ class BasicTableScanner implements TableScanner {
+ private int tableIndex = -1;
+ private Integer[] virtualColumnIndices = null;
+ private TableScanner scanner = null;
+
+ BasicTableScanner(RowTableSplit split, String projection,
+ Configuration conf) throws IOException, ParseException, ParseException {
+ tableIndex = split.getTableIndex();
+ virtualColumnIndices = Projection.getVirtualColumnIndices(projection);
+ BasicTable.Reader reader =
+ new BasicTable.Reader(new Path(split.getPath()), getDeletedCGs(conf), conf);
+ reader.setProjection(projection);
+ scanner = reader.getScanner(true, split.getSplit());
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ return scanner.advance();
+ }
+
+ @Override
+ public boolean atEnd() throws IOException {
+ return scanner.atEnd();
+ }
+
+ @Override
+ public Schema getSchema() {
+ return scanner.getSchema();
+ }
+
+ @Override
+ public void getKey(BytesWritable key) throws IOException {
+ scanner.getKey(key);
+ }
+
+ @Override
+ public void getValue(Tuple row) throws IOException {
+ scanner.getValue(row);
+ if (virtualColumnIndices != null)
+ {
+ for (int i = 0; i < virtualColumnIndices.length; i++)
+ {
+ row.set(virtualColumnIndices[i], tableIndex);
+ }
+ }
+ }
+
+ @Override
+ public boolean seekTo(BytesWritable key) throws IOException {
+ return scanner.seekTo(key);
+ }
+
+ @Override
+ public void seekToEnd() throws IOException {
+ scanner.seekToEnd();
+ }
+
+ @Override
+ public void close() throws IOException {
+ scanner.close();
+ }
+
+ @Override
+ public String getProjection() {
+ return scanner.getProjection();
+ }
+ }
}
Modified: hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExpr.java?rev=928385&r1=928384&r2=928385&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExpr.java (original)
+++ hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExpr.java Sun Mar 28 12:16:54 2010
@@ -27,7 +27,6 @@ import org.apache.hadoop.io.BytesWritabl
import org.apache.hadoop.zebra.io.BasicTable;
import org.apache.hadoop.zebra.io.TableScanner;
import org.apache.hadoop.zebra.parser.ParseException;
-import org.apache.hadoop.zebra.types.Projection;
import org.apache.hadoop.zebra.schema.Schema;
/**
@@ -132,10 +131,10 @@ abstract class TableExpr {
}
/**
- * Get a scanner with an unsorted split.
+ * Get a scanner with an row split.
*
* @param split
- * The range split.
+ * The row split.
* @param projection
* The projection schema. It should never be null.
* @param conf
@@ -144,11 +143,8 @@ abstract class TableExpr {
* @throws IOException
*/
public TableScanner getScanner(RowTableSplit split, String projection,
- Configuration conf) throws IOException, ParseException, ParseException {
- BasicTable.Reader reader =
- new BasicTable.Reader(new Path(split.getPath()), getDeletedCGs(conf), conf);
- reader.setProjection(projection);
- return reader.getScanner(true, split.getSplit());
+ Configuration conf) throws IOException, ParseException {
+ return null;
}
/**
Modified: hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java?rev=928385&r1=928384&r2=928385&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java (original)
+++ hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java Sun Mar 28 12:16:54 2010
@@ -144,6 +144,12 @@ import org.apache.pig.data.Tuple;
* </UL>
*/
public class TableInputFormat extends InputFormat<BytesWritable, Tuple> {
+ public enum SplitMode {
+ UNSORTED, /* Data is not sorted. Default split mode*/
+ LOCALLY_SORTED, /* Output by the each mapper is sorted */
+ GLOBALLY_SORTED /* Output is locally sorted and the key ranges are not overlapped, not even on boundary */
+ };
+
static Log LOG = LogFactory.getLog(TableInputFormat.class);
private static final String INPUT_EXPR = "mapreduce.lib.table.input.expr";
@@ -151,6 +157,10 @@ public class TableInputFormat extends In
private static final String INPUT_SORT = "mapreduce.lib.table.input.sort";
static final String INPUT_FE = "mapreduce.lib.table.input.fe";
static final String INPUT_DELETED_CGS = "mapreduce.lib.table.input.deleted_cgs";
+ private static final String INPUT_SPLIT_MODE = "mapreduce.lib.table.input.split_mode";
+ private static final String UNSORTED = "unsorted";
+ private static final String GLOBALLY_SORTED = "globally_sorted";
+ private static final String LOCALLY_SORTED = "locally_sorted";
static final String DELETED_CG_SEPARATOR_PER_UNION = ";";
/**
@@ -316,16 +326,28 @@ public class TableInputFormat extends In
return null;
}
- /**
- * Set requirement for sorted table
- *
- *@param conf
- * Configuration object.
- */
- private static void setSorted(Configuration conf) {
+ private static boolean globalOrderingRequired(JobContext jobContext)
+ {
+ Configuration conf = jobContext.getConfiguration();
+ String result = conf.get(INPUT_SPLIT_MODE, UNSORTED);
+ return result.equalsIgnoreCase(GLOBALLY_SORTED);
+ }
+
+ private static void setSorted(JobContext jobContext) {
+ Configuration conf = jobContext.getConfiguration();
conf.setBoolean(INPUT_SORT, true);
}
+ private static void setSorted(JobContext jobContext, SplitMode sm)
+ {
+ setSorted(jobContext);
+ Configuration conf = jobContext.getConfiguration();
+ if (sm == SplitMode.GLOBALLY_SORTED)
+ conf.set(INPUT_SPLIT_MODE, GLOBALLY_SORTED);
+ else if (sm == SplitMode.LOCALLY_SORTED)
+ conf.set(INPUT_SPLIT_MODE, LOCALLY_SORTED);
+ }
+
/**
* Get the SortInfo object regarding a Zebra table
*
@@ -368,13 +390,33 @@ public class TableInputFormat extends In
/**
* Requires sorted table or table union
*
+ * @deprecated
+ *
* @param jobContext
* JobContext object.
* @param sortInfo
* ZebraSortInfo object containing sorting information.
*
*/
- public static void requireSortedTable(JobContext jobContext, ZebraSortInfo sortInfo) throws IOException {
+
+ public static void requireSortedTable(JobContext jobContext, ZebraSortInfo sortInfo) throws IOException {
+ setSplitMode(jobContext, SplitMode.GLOBALLY_SORTED, sortInfo);
+ }
+
+ /**
+ *
+ * @param conf
+ * JonConf object
+ * @param sm
+ * Split mode: unsorted, globally sorted, locally sorted. Default is unsorted
+ * @param sortInfo
+ * ZebraSortInfo object containing sorting information. Will be ignored if
+ * the split mode is null or unsorted
+ * @throws IOException
+ */
+ public static void setSplitMode(JobContext jobContext, SplitMode sm, ZebraSortInfo sortInfo) throws IOException {
+ if (sm == null || sm == SplitMode.UNSORTED)
+ return;
TableExpr expr = getInputExpr( jobContext );
Configuration conf = jobContext.getConfiguration();
String comparatorName = null;
@@ -428,8 +470,7 @@ public class TableInputFormat extends In
}
}
}
- // need key range input splits for sorted table union
- setSorted(conf);
+ setSorted(jobContext, sm);
}
/**
@@ -599,6 +640,7 @@ public class TableInputFormat extends In
SortedTableSplit split = new SortedTableSplit(beginB, endB, bd, conf);
splits.add(split);
}
+ LOG.info("getSplits : returning " + splits.size() + " sorted splits.");
return splits;
}
@@ -716,7 +758,8 @@ public class TableInputFormat extends In
FileStatus[] fileStatuses = fs.listStatus(globStat.getPath(), inputFilter);
// reorder according to CG index
BasicTable.Reader reader = readers.get(index);
- reader.rearrangeFileIndices(fileStatuses);
+ if (fileStatuses.length > 1)
+ reader.rearrangeFileIndices(fileStatuses);
for(FileStatus stat: fileStatuses) {
if (stat != null)
result.add(stat);
@@ -758,6 +801,7 @@ public class TableInputFormat extends In
boolean first = true;
PathFilter filter = null;
List<BasicTable.Reader> realReaders = new ArrayList<BasicTable.Reader>();
+ int[] realReaderIndices = new int[readers.size()];
for (int i = 0; i < readers.size(); ++i) {
BasicTable.Reader reader = readers.get(i);
@@ -766,7 +810,8 @@ public class TableInputFormat extends In
/* We can create input splits only if there does exist a valid column group for split.
* Otherwise, we do not create input splits. */
- if (splitCGIndex >= 0) {
+ if (splitCGIndex >= 0) {
+ realReaderIndices[realReaders.size()] = i;
realReaders.add(reader);
if (first)
{
@@ -862,11 +907,13 @@ public class TableInputFormat extends In
if (splitLen > 0)
batches[++numBatches] = splitLen;
- List<RowSplit> subSplits = reader.rowSplit(starts, lengths, paths, splitCGIndex, batches, numBatches);
+ List<RowSplit> subSplits = reader.rowSplit(starts, lengths, paths, splitCGIndex,
+ batches, numBatches);
+ int realTableIndex = realReaderIndices[tableIndex];
for (Iterator<RowSplit> it = subSplits.iterator(); it.hasNext();) {
RowSplit subSplit = it.next();
- RowTableSplit split = new RowTableSplit(reader, subSplit, conf);
+ RowTableSplit split = new RowTableSplit(reader, subSplit, realTableIndex, conf);
ret.add(split);
}
}
@@ -884,7 +931,7 @@ public class TableInputFormat extends In
return getSplits( jobContext, false );
}
- private List<InputSplit> getSplits(JobContext jobContext, boolean singleSplit) throws IOException {
+ List<InputSplit> getSplits(JobContext jobContext, boolean singleSplit) throws IOException {
Configuration conf = jobContext.getConfiguration();
TableExpr expr = getInputExpr( conf );
if( getSorted(conf) ) {
@@ -938,9 +985,20 @@ public class TableInputFormat extends In
return new ArrayList<InputSplit>();
}
- return sorted ?
- singleSplit ? getSortedSplits( conf, 1, expr, readers, status) : getSortedSplits(conf, -1, expr, readers, status) :
- getRowSplits( conf, expr, readers, status);
+ List<InputSplit> result;
+ if (sorted)
+ {
+ if (singleSplit)
+ result = getSortedSplits( conf, 1, expr, readers, status);
+ else if (globalOrderingRequired(jobContext))
+ result = getSortedSplits(conf, -1, expr, readers, status);
+ else
+ result = getRowSplits( conf, expr, readers, status);
+ } else
+ result = getRowSplits( conf, expr, readers, status);
+
+ return result;
+
} catch (ParseException e) {
throw new IOException("Projection parsing failed : "+e.getMessage());
} finally {
@@ -1166,15 +1224,17 @@ class RowTableSplit extends InputSplit i
/**
*
*/
-String path = null;
+ String path = null;
+ int tableIndex = 0;
RowSplit split = null;
String[] hosts = null;
long length = 1;
- public RowTableSplit(Reader reader, RowSplit split, Configuration conf)
+ public RowTableSplit(Reader reader, RowSplit split, int tableIndex, Configuration conf)
throws IOException {
this.path = reader.getPath();
this.split = split;
+ this.tableIndex = tableIndex;
BlockDistribution dataDist = reader.getBlockDistribution(split);
if (dataDist != null) {
length = dataDist.getLength();
@@ -1199,6 +1259,7 @@ String path = null;
@Override
public void readFields(DataInput in) throws IOException {
+ tableIndex = WritableUtils.readVInt(in);
path = WritableUtils.readString(in);
int bool = WritableUtils.readVInt(in);
if (bool == 1) {
@@ -1214,6 +1275,7 @@ String path = null;
@Override
public void write(DataOutput out) throws IOException {
+ WritableUtils.writeVInt(out, tableIndex);
WritableUtils.writeString(out, path);
if (split == null) {
WritableUtils.writeVInt(out, 0);
@@ -1233,4 +1295,8 @@ String path = null;
public RowSplit getSplit() {
return split;
}
+
+ public int getTableIndex() {
+ return tableIndex;
+ }
}
Modified: hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java?rev=928385&r1=928384&r2=928385&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java (original)
+++ hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java Sun Mar 28 12:16:54 2010
@@ -53,19 +53,18 @@ public class TableRecordReader extends R
*/
public TableRecordReader(TableExpr expr, String projection,
InputSplit split, JobContext jobContext) throws IOException, ParseException {
- Configuration conf = jobContext.getConfiguration();
- if (expr.sortedSplitRequired()) {
+ Configuration conf = jobContext.getConfiguration();
+ if (split instanceof RowTableSplit) {
+ RowTableSplit rowSplit = (RowTableSplit) split;
+ if ((!expr.sortedSplitRequired() || rowSplit.getTableIndex() == -1) &&
+ Projection.getVirtualColumnIndices(projection) != null)
+ throw new IllegalArgumentException("virtual column requires union of multiple sorted tables");
+ scanner = expr.getScanner(rowSplit, projection, conf);
+ } else {
SortedTableSplit tblSplit = (SortedTableSplit) split;
scanner =
expr.getScanner(tblSplit.getBegin(), tblSplit.getEnd(), projection,
conf);
- } else if (split != null && split instanceof RowTableSplit) {
- RowTableSplit rowSplit = (RowTableSplit) split;
- scanner = expr.getScanner(rowSplit, projection, conf);
- }
- else {
- UnsortedTableSplit tblSplit = (UnsortedTableSplit) split;
- scanner = expr.getScanner(tblSplit, projection, conf);
}
}
Modified: hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableUnionExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableUnionExpr.java?rev=928385&r1=928384&r2=928385&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableUnionExpr.java (original)
+++ hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableUnionExpr.java Sun Mar 28 12:16:54 2010
@@ -29,7 +29,6 @@ import java.util.concurrent.PriorityBloc
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.zebra.io.BasicTable;
-import org.apache.hadoop.zebra.io.BasicTableStatus;
import org.apache.hadoop.zebra.io.TableScanner;
import org.apache.hadoop.zebra.parser.ParseException;
import org.apache.hadoop.zebra.types.Projection;
@@ -157,6 +156,13 @@ class TableUnionExpr extends CompositeTa
throw new IllegalArgumentException("virtual column requires union of multiple tables");
return new SortedTableUnionScanner(scanners, Projection.getVirtualColumnIndices(projection));
}
+
+ @Override
+ public TableScanner getScanner(RowTableSplit split, String projection,
+ Configuration conf) throws IOException, ParseException {
+ BasicTableExpr expr = (BasicTableExpr) composite.get(split.getTableIndex());
+ return expr.getScanner(split, projection, conf);
+ }
}
/**
Modified: hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java?rev=928385&r1=928384&r2=928385&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java (original)
+++ hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java Sun Mar 28 12:16:54 2010
@@ -38,6 +38,8 @@ import org.apache.hadoop.mapreduce.lib.i
import org.apache.hadoop.zebra.io.BasicTable;
import org.apache.hadoop.zebra.mapreduce.TableInputFormat;
import org.apache.hadoop.zebra.mapreduce.TableRecordReader;
+import org.apache.hadoop.zebra.mapreduce.TableInputFormat.SplitMode;
+import org.apache.hadoop.zebra.mapreduce.ZebraSortInfo;
import org.apache.hadoop.zebra.parser.ParseException;
import org.apache.hadoop.zebra.schema.ColumnType;
import org.apache.hadoop.zebra.schema.Schema;
@@ -57,15 +59,17 @@ import org.apache.pig.impl.logicalLayer.
import org.apache.pig.impl.util.UDFContext;
import org.apache.hadoop.zebra.pig.comparator.*;
import org.apache.pig.IndexableLoadFunc;
+import org.apache.pig.CollectableLoadFunc;
/**
* Pig IndexableLoadFunc and Slicer for Zebra Table
*/
public class TableLoader extends LoadFunc implements LoadMetadata, LoadPushDown,
- IndexableLoadFunc{
+ IndexableLoadFunc, CollectableLoadFunc {
static final Log LOG = LogFactory.getLog(TableLoader.class);
private static final String UDFCONTEXT_PROJ_STRING = "zebra.UDFContext.projectionString";
+ private static final String UDFCONTEXT_GLOBAL_SORTING = "zebra.UDFContext.globalSorting";
private String projectionString;
@@ -170,14 +174,19 @@ public class TableLoader extends LoadFun
* @throws IOException
*/
private void setProjection(Job job) throws IOException {
+ Properties properties = UDFContext.getUDFContext().getUDFProperties(
+ this.getClass(), new String[]{ udfContextSignature } );
+ boolean requireGlobalOrder = "true".equals(properties.getProperty( UDFCONTEXT_GLOBAL_SORTING));
+ if (requireGlobalOrder && !sorted)
+ throw new IOException("Global sorting can be only asked on table loaded as sorted");
if( sorted ) {
- TableInputFormat.requireSortedTable( job, null );
+ SplitMode splitMode =
+ requireGlobalOrder ? SplitMode.GLOBALLY_SORTED : SplitMode.LOCALLY_SORTED;
+ TableInputFormat.setSplitMode(job, splitMode, null);
sortInfo = TableInputFormat.getSortInfo( job );
}
try {
- Properties properties = UDFContext.getUDFContext().getUDFProperties(
- this.getClass(), new String[]{ udfContextSignature } );
String prunedProjStr = properties.getProperty( UDFCONTEXT_PROJ_STRING );
if( prunedProjStr != null ) {
@@ -414,4 +423,11 @@ public class TableLoader extends LoadFun
public void setUDFContextSignature(String signature) {
udfContextSignature = signature;
}
+
+ @Override
+ public void ensureAllKeyInstancesInSameSplit() throws IOException {
+ Properties properties = UDFContext.getUDFContext().getUDFProperties(this.getClass(),
+ new String[] { udfContextSignature } );
+ properties.setProperty(UDFCONTEXT_GLOBAL_SORTING, "true");
+ }
}
Modified: hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/BaseTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/BaseTestCase.java?rev=928385&r1=928384&r2=928385&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/BaseTestCase.java (original)
+++ hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/BaseTestCase.java Sun Mar 28 12:16:54 2010
@@ -20,6 +20,9 @@ package org.apache.hadoop.zebra;
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
import junit.framework.Assert;
@@ -31,6 +34,7 @@ import org.apache.hadoop.conf.Configured
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.data.Tuple;
public class BaseTestCase extends Configured {
protected static PigServer pigServer = null;
@@ -126,6 +130,45 @@ public class BaseTestCase extends Config
}
}
+ /**
+ * Verify union output table with expected results
+ *
+ */
+ protected int verifyTable(HashMap<Integer, ArrayList<ArrayList<Object>>> resultTable,
+ int keyColumn, int tblIdxCol, Iterator<Tuple> it) throws IOException {
+ int numbRows = 0;
+ int index = 0, rowIndex = -1, rowCount = -1, prevIndex = -1;
+ Object value;
+ boolean first = true;
+ ArrayList<ArrayList<Object>> rows = null;
+
+ while (it.hasNext()) {
+ Tuple rowValues = it.next();
+
+ if (first) {
+ index = (Integer) rowValues.get(tblIdxCol);
+ Assert.assertNotSame(prevIndex, index);
+ rows = resultTable.get(index);
+ rowIndex = 0;
+ rowCount = rows.size();
+ first = false;
+ }
+ value = rows.get(rowIndex++).get(keyColumn);
+ Assert.assertEquals("Table comparison error for row : " + numbRows + " - no key found for : "
+ + rowValues.get(keyColumn), value, rowValues.get(keyColumn));
+
+ if (rowIndex == rowCount)
+ {
+ // current table is run out; start on a new table for next iteration
+ first = true;
+ prevIndex = index;
+ }
+
+ ++numbRows;
+ }
+ return numbRows;
+ }
+
public static void checkTableExists(boolean expected, String strDir) throws IOException {
File theDir = null;
boolean actual = false;
Modified: hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/ToolTestComparator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/ToolTestComparator.java?rev=928385&r1=928384&r2=928385&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/ToolTestComparator.java (original)
+++ hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/ToolTestComparator.java Sun Mar 28 12:16:54 2010
@@ -37,7 +37,7 @@ public class ToolTestComparator extends
final static String TABLE_SCHEMA = "count:int,seed:int,int1:int,int2:int,str1:string,str2:string,byte1:bytes,"
+ "byte2:bytes,float1:float,long1:long,double1:double,m1:map(string),r1:record(f1:string, f2:string),"
- + "c1:collection(a:string, b:string)";
+ + "c1:collection(record(a:string, b:string))";
final static String TABLE_STORAGE = "[count,seed,int1,int2,str1,str2,byte1,byte2,float1,long1,double1];[m1#{a}];[r1,c1]";
private static Random generator = new Random();
@@ -399,13 +399,15 @@ public class ToolTestComparator extends
tuple.set(12, tupRecord1);
// insert collection1
- tupColl1.set(0, "c1 a " + seed);
- tupColl1.set(1, "c1 a " + i);
- bag1.add(tupColl1); // first collection item
-
- tupColl2.set(0, "c1 b " + seed);
- tupColl2.set(1, "c1 b " + i);
- bag1.add(tupColl2); // second collection item
+ // tupColl1.set(0, "c1 a " + seed);
+ // tupColl1.set(1, "c1 a " + i);
+ // bag1.add(tupColl1); // first collection item
+ bag1.add(tupRecord1); // first collection item
+ bag1.add(tupRecord1); // second collection item
+
+ // tupColl2.set(0, "c1 b " + seed);
+ // tupColl2.set(1, "c1 b " + i);
+ // bag1.add(tupColl2); // second collection item
tuple.set(13, bag1);
Modified: hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/ToolTestComparator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/ToolTestComparator.java?rev=928385&r1=928384&r2=928385&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/ToolTestComparator.java (original)
+++ hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/ToolTestComparator.java Sun Mar 28 12:16:54 2010
@@ -19,45 +19,32 @@
package org.apache.hadoop.zebra.mapreduce;
import java.io.ByteArrayOutputStream;
-import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
-import java.io.Serializable;
import java.util.HashMap;
import java.util.Iterator;
import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
import java.util.Random;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.zebra.io.BasicTable;
import org.apache.hadoop.zebra.io.TableInserter;
import org.apache.hadoop.zebra.io.TableScanner;
-import org.apache.hadoop.zebra.io.BasicTable.Reader.RangeSplit;
import org.apache.hadoop.zebra.mapreduce.SortedTableSplit;
import org.apache.hadoop.zebra.mapreduce.TableInputFormat;
import org.apache.hadoop.zebra.parser.ParseException;
import org.apache.hadoop.zebra.pig.TableStorer;
import org.apache.hadoop.zebra.schema.Schema;
import org.apache.hadoop.zebra.types.TypesUtils;
-import org.apache.pig.ExecType;
-import org.apache.pig.PigServer;
+import org.apache.hadoop.zebra.BaseTestCase;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.executionengine.ExecJob;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
-import org.apache.pig.test.MiniCluster;
import org.junit.Assert;
/**
@@ -66,25 +53,15 @@ import org.junit.Assert;
* Utility for verifying tables created during Zebra Stress Testing
*
*/
-public class ToolTestComparator {
+public class ToolTestComparator extends BaseTestCase {
final static String TABLE_SCHEMA = "count:int,seed:int,int1:int,int2:int,str1:string,str2:string,byte1:bytes,"
+ "byte2:bytes,float1:float,long1:long,double1:double,m1:map(string),r1:record(f1:string, f2:string),"
- + "c1:collection(a:string, b:string)";
+ + "c1:collection(record(a:string, b:string))";
final static String TABLE_STORAGE = "[count,seed,int1,int2,str1,str2,byte1,byte2,float1,long1,double1];[m1#{a}];[r1,c1]";
private static Random generator = new Random();
- private static Configuration conf;
- private static FileSystem fs;
-
- protected static ExecType execType = ExecType.MAPREDUCE;
- private static MiniCluster cluster;
- protected static PigServer pigServer;
protected static ExecJob pigJob;
- private static Path path;
-
- private static String zebraJar;
- private static String whichCluster;
private static int totalNumbCols;
private static long totalNumbVerifiedRows;
@@ -93,65 +70,7 @@ public class ToolTestComparator {
* Setup and initialize environment
*/
public static void setUp() throws Exception {
- System.out.println("setUp()");
- if (System.getProperty("hadoop.log.dir") == null) {
- String base = new File(".").getPath(); // getAbsolutePath();
- System
- .setProperty("hadoop.log.dir", new Path(base).toString() + "./logs");
- }
-
-
- if (System.getProperty("whichCluster") == null) {
- System.setProperty("whichCluster", "miniCluster");
- whichCluster = System.getProperty("whichCluster");
- } else {
- whichCluster = System.getProperty("whichCluster");
- }
-
- System.out.println("cluster: " + whichCluster);
- if (whichCluster.equalsIgnoreCase("realCluster")
- && System.getenv("HADOOP_HOME") == null) {
- System.out.println("Please set HADOOP_HOME");
- System.exit(0);
- }
-
- conf = new Configuration();
-
- if (whichCluster.equalsIgnoreCase("realCluster")
- && System.getenv("USER") == null) {
- System.out.println("Please set USER");
- System.exit(0);
- }
- zebraJar = System.getenv("HADOOP_HOME") + "/../jars/zebra.jar";
- File file = new File(zebraJar);
- if (!file.exists() && whichCluster.equalsIgnoreCase("realCulster")) {
- System.out.println("Please put zebra.jar at hadoop_home/../jars");
- System.exit(0);
- }
-
- if (whichCluster.equalsIgnoreCase("realCluster")) {
- System.out.println("Running realCluster");
- pigServer = new PigServer(ExecType.MAPREDUCE, ConfigurationUtil
- .toProperties(conf));
- pigServer.registerJar(zebraJar);
- path = new Path("/user/" + System.getenv("USER") + "/TestComparator");
- // removeDir(path);
- fs = path.getFileSystem(conf);
- }
-
- if (whichCluster.equalsIgnoreCase("miniCluster")) {
- System.out.println("Running miniCluster");
- if (execType == ExecType.MAPREDUCE) {
- cluster = MiniCluster.buildCluster();
- pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
- fs = cluster.getFileSystem();
- path = new Path(fs.getWorkingDirectory() + "/TestComparator");
- // removeDir(path);
- System.out.println("path1 =" + path);
- } else {
- pigServer = new PigServer(ExecType.LOCAL);
- }
- }
+ init();
}
/**
@@ -389,7 +308,7 @@ public class ToolTestComparator {
while (it1.hasNext()) {
++numbRows1; // increment row count
- Tuple rowValue = it1.next();
+ it1.next();
}
numbRows.add(numbRows1);
numbUnionRows += numbRows1;
@@ -499,13 +418,15 @@ public class ToolTestComparator {
tuple.set(12, tupRecord1);
// insert collection1
- tupColl1.set(0, "c1 a " + seed);
- tupColl1.set(1, "c1 a " + i);
- bag1.add(tupColl1); // first collection item
-
- tupColl2.set(0, "c1 b " + seed);
- tupColl2.set(1, "c1 b " + i);
- bag1.add(tupColl2); // second collection item
+ // tupColl1.set(0, "c1 a " + seed);
+ // tupColl1.set(1, "c1 a " + i);
+ // bag1.add(tupColl1); // first collection item
+ bag1.add(tupRecord1); // first collection item
+ bag1.add(tupRecord1); // second collection item
+
+ // tupColl2.set(0, "c1 b " + seed);
+ // tupColl2.set(1, "c1 b " + i);
+ // bag1.add(tupColl2); // second collection item
tuple.set(13, bag1);
@@ -604,7 +525,7 @@ public class ToolTestComparator {
TableInputFormat.requireSortedTable(job, null);
TableInputFormat tif = new TableInputFormat();
- SortedTableSplit split = (SortedTableSplit) tif.getSplits(job).get(0);
+ SortedTableSplit split = (SortedTableSplit) tif.getSplits(job, true).get(0);
TableScanner scanner = reader.getScanner(split.getBegin(), split.getEnd(), true);
BytesWritable key = new BytesWritable();
@@ -731,12 +652,9 @@ public class ToolTestComparator {
TableInputFormat.setInputPaths(job, new Path(pathTable1));
TableInputFormat.requireSortedTable(job, null);
- TableInputFormat tif = new TableInputFormat();
-
TableScanner scanner = reader.getScanner(null, null, true);
BytesWritable key = new BytesWritable();
- Tuple rowValue = TypesUtils.createTuple(scanner.getSchema());
while (!scanner.atEnd()) {
++numbRows;
@@ -746,41 +664,6 @@ public class ToolTestComparator {
System.out.println("\nTable Path : " + pathTable1);
System.out.println("Table Row number : " + numbRows);
}
- /**
- * Compare table rows
- *
- */
- private static boolean compareRow(Tuple rowValues1, Tuple rowValues2)
- throws IOException {
- boolean result = true;
- Assert.assertEquals(rowValues1.size(), rowValues2.size());
- for (int i = 0; i < rowValues1.size(); ++i) {
- if (!compareObj(rowValues1.get(i), rowValues2.get(i))) {
- System.out.println("DEBUG: " + " RowValue.get(" + i
- + ") value compare error : " + rowValues1.get(i) + " : "
- + rowValues2.get(i));
- result = false;
- break;
- }
- }
- return result;
- }
-
- /**
- * Compare table values
- *
- */
- private static boolean compareObj(Object object1, Object object2) {
- if (object1 == null) {
- if (object2 == null)
- return true;
- else
- return false;
- } else if (object1.equals(object2))
- return true;
- else
- return false;
- }
/**
* Compares two objects that implement the Comparable interface
@@ -837,28 +720,6 @@ public class ToolTestComparator {
}
/**
- * Remove directory
- *
- */
- public static void removeDir(Path outPath) throws IOException {
- String command = null;
- if (whichCluster.equalsIgnoreCase("realCluster")) {
- command = System.getenv("HADOOP_HOME") + "/bin/hadoop fs -rmr "
- + outPath.toString();
- } else {
- command = "rm -rf " + outPath.toString();
- }
- Runtime runtime = Runtime.getRuntime();
- Process proc = runtime.exec(command);
- int exitVal = -1;
- try {
- exitVal = proc.waitFor();
- } catch (InterruptedException e) {
- System.err.println(e);
- }
- }
-
- /**
* Calculate elapsed time
*
*/
@@ -1021,7 +882,6 @@ public class ToolTestComparator {
// Verify merge-join table is in sorted order
verifyMergeJoin(pathTable1, sortCol, sortString, numbCols, rowMod,verifyDataColName);
} else if (verifyOption.equals("sorted-union")) {
- Object lastVal = null;
// Verify sorted-union table is in sorted order
verifySortedUnion(unionPaths, pathTable1, sortCol, sortString,
@@ -1045,7 +905,6 @@ public class ToolTestComparator {
// Create sorted table
createsortedtable(pathTable1, pathTable2, sortString, debug);
}else if (verifyOption.equals("printrownumber")) {
- Object lastVal = null;
//print total number of rows of the table
printRowNumber(pathTable1,sortString);
}
Modified: hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveMultiTableGlob.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveMultiTableGlob.java?rev=928385&r1=928384&r2=928385&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveMultiTableGlob.java (original)
+++ hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveMultiTableGlob.java Sun Mar 28 12:16:54 2010
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.io.PrintWriter;
import java.util.Iterator;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.Path;
@@ -159,7 +160,7 @@ public class TestOrderPreserveMultiTable
paths += ++fileId + ",";
paths = paths.substring(0, paths.lastIndexOf(",")); // remove trailing comma
paths += "}";
-
+
String queryLoad = "records1 = LOAD '"
+ paths
+ "' USING org.apache.hadoop.zebra.pig.TableLoader('" + columns + "', 'sorted');";
@@ -185,25 +186,30 @@ public class TestOrderPreserveMultiTable
}
// Test with input tables and provided output columns
- testOrderPreserveUnion(inputTables, "int1", "int1, str1, byte1");
+ testOrderPreserveUnion(inputTables, "int1", "int1, str1, byte1, source_table");
// Create results table for verification
- ArrayList<ArrayList<Object>> resultTable = new ArrayList<ArrayList<Object>>();
+ HashMap<Integer, ArrayList<ArrayList<Object>>> resultTable =
+ new HashMap<Integer, ArrayList<ArrayList<Object>>>();
+
+ // The ordering from FileInputFormat glob expansion.
+ int[] tblIndexList = {0, 9, 1, 2, 3, 4, 5, 6, 7, 8};
+
for (int i=0; i<NUMB_TABLE; ++i) {
+ ArrayList<ArrayList<Object>> rows = new ArrayList<ArrayList<Object>>();
for (int j=0; j<NUMB_TABLE_ROWS; ++j) {
ArrayList<Object> resultRow = new ArrayList<Object>();
-
- resultRow.add(i); // int1
+ resultRow.add(tblIndexList[i]); // int1
resultRow.add(new String("string" + j)); // str1
resultRow.add(new DataByteArray("byte" + (NUMB_TABLE_ROWS - j))); // byte1
-
- resultTable.add(resultRow);
+ rows.add(resultRow);
}
+ resultTable.put(i, rows);
}
// Verify union table
Iterator<Tuple> it = pigServer.openIterator("records1");
- int numbRows = verifyTable(resultTable, 0, it);
+ int numbRows = verifyTable(resultTable, 0, 3, it);
Assert.assertEquals(totalTableRows, numbRows);
@@ -212,100 +218,6 @@ public class TestOrderPreserveMultiTable
}
/**
- * Verify union output table with expected results
- *
- */
- private int verifyTable(ArrayList<ArrayList<Object>> resultTable, int keyColumn, Iterator<Tuple> it) throws IOException {
- int numbRows = 0;
- int index = 0;
- Object value = resultTable.get(index).get(keyColumn); // get value of primary key
-
- while (it.hasNext()) {
- Tuple rowValues = it.next();
-
- // If last primary sort key does match then search for next matching key
- if (! compareObj(value, rowValues.get(keyColumn))) {
- int subIndex = index + 1;
- while (subIndex < resultTable.size()) {
- if ( ! compareObj(value, resultTable.get(subIndex).get(keyColumn)) ) { // found new key
- index = subIndex;
- value = resultTable.get(index).get(keyColumn);
- break;
- }
- ++subIndex;
- }
- Assert.assertEquals("Table comparison error for row : " + numbRows + " - no key found for : "
- + rowValues.get(keyColumn), value, rowValues.get(keyColumn));
- }
- // Search for matching row with this primary key
- int subIndex = index;
-
- while (subIndex < resultTable.size()) {
- // Compare row
- ArrayList<Object> resultRow = resultTable.get(subIndex);
- if ( compareRow(rowValues, resultRow) )
- break; // found matching row
- ++subIndex;
- Assert.assertEquals("Table comparison error for row : " + numbRows + " - no matching row found for : "
- + rowValues.get(keyColumn), value, resultTable.get(subIndex).get(keyColumn));
- }
- ++numbRows;
- }
- Assert.assertEquals(resultTable.size(), numbRows); // verify expected row count
- return numbRows;
- }
-
- /**
- * Compare table rows
- *
- */
- private boolean compareRow(Tuple rowValues, ArrayList<Object> resultRow) throws IOException {
- boolean result = true;
- Assert.assertEquals(resultRow.size(), rowValues.size());
- for (int i = 0; i < rowValues.size(); ++i) {
- if (! compareObj(rowValues.get(i), resultRow.get(i)) ) {
- result = false;
- break;
- }
- }
- return result;
- }
-
- /**
- * Compare table values
- *
- */
- private boolean compareObj(Object object1, Object object2) {
- if (object1 == null) {
- if (object2 == null)
- return true;
- else
- return false;
- } else if (object1.equals(object2))
- return true;
- else
- return false;
- }
-
- /**
- * Print Pig Table (for debugging)
- *
- */
- private int printTable(String tablename) throws IOException {
- Iterator<Tuple> it1 = pigServer.openIterator(tablename);
- int numbRows = 0;
- while (it1.hasNext()) {
- Tuple RowValue1 = it1.next();
- ++numbRows;
- System.out.println();
- for (int i = 0; i < RowValue1.size(); ++i)
- System.out.println("DEBUG: " + tablename + " RowValue.get(" + i + ") = " + RowValue1.get(i));
- }
- System.out.println("\nRow count : " + numbRows);
- return numbRows;
- }
-
- /**
* Return the name of the routine that called getCurrentMethodName
*
*/