You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ya...@apache.org on 2010/02/13 01:06:17 UTC
svn commit: r909667 [2/9] - in
/hadoop/pig/branches/load-store-redesign/contrib/zebra: ./
src/java/org/apache/hadoop/zebra/ src/java/org/apache/hadoop/zebra/io/
src/java/org/apache/hadoop/zebra/mapred/
src/java/org/apache/hadoop/zebra/mapreduce/ src/ja...
Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java?rev=909667&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java Sat Feb 13 00:06:15 2010
@@ -0,0 +1,1021 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.zebra.tfile.RawComparable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.BasicTableStatus;
+import org.apache.hadoop.zebra.io.BlockDistribution;
+import org.apache.hadoop.zebra.io.KeyDistribution;
+import org.apache.hadoop.zebra.io.BasicTable.Reader;
+import org.apache.hadoop.zebra.io.BasicTable.Reader.RangeSplit;
+import org.apache.hadoop.zebra.io.BasicTable.Reader.RowSplit;
+import org.apache.hadoop.zebra.mapreduce.TableExpr.LeafTableInfo;
+import org.apache.hadoop.zebra.parser.ParseException;
+import org.apache.hadoop.zebra.types.Projection;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.types.SortInfo;
+import org.apache.hadoop.zebra.tfile.TFile;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+
+/**
+ * {@link org.apache.hadoop.mapreduce.InputFormat} class for reading one or more
+ * BasicTables.
+ *
+ * Usage Example:
+ * <p>
+ * In the main program, add the following code.
+ *
+ * <pre>
+ * job.setInputFormatClass(TableInputFormat.class);
+ * TableInputFormat.setInputPaths(jobContext, new Path("path/to/table1", new Path("path/to/table2");
+ * TableInputFormat.setProjection(jobContext, "Name, Salary, BonusPct");
+ * </pre>
+ *
+ * The above code does the following things:
+ * <UL>
+ * <LI>Set the input format class to TableInputFormat.
+ * <LI>Set the paths to the BasicTables to be consumed by user's Mapper code.
+ * <LI>Set the projection on the input tables. In this case, the Mapper code is
+ * only interested in three fields: "Name", "Salary", "BonusPct". "Salary"
+ * (perhaps for the purpose of calculating the person's total payout). If no
+ * project is specified, then all columns from the input tables will be
+ * retrieved. If input tables have different schemas, then the input contains
+ * the union of all columns from all the input tables. Absent fields will be
+ * left as nul in the input tuple.
+ * </UL>
+ * The user Mapper code should look like the following:
+ *
+ * <pre>
+ * static class MyMapClass implements Mapper<BytesWritable, Tuple, K, V> {
+ * // keep the tuple object for reuse.
+ * // indices of various fields in the input Tuple.
+ * int idxName, idxSalary, idxBonusPct;
+ *
+ * @Override
+ * public void configure(Job job) {
+ * Schema projection = TableInputFormat.getProjection(job);
+ * // determine the field indices.
+ * idxName = projection.getColumnIndex("Name");
+ * idxSalary = projection.getColumnIndex("Salary");
+ * idxBonusPct = projection.getColumnIndex("BonusPct");
+ * }
+ *
+ * @Override
+ * public void map(BytesWritable key, Tuple value, OutputCollector<K, V> output,
+ * Reporter reporter) throws IOException {
+ * try {
+ * String name = (String) value.get(idxName);
+ * int salary = (Integer) value.get(idxSalary);
+ * double bonusPct = (Double) value.get(idxBonusPct);
+ * // do something with the input data
+ * } catch (ExecException e) {
+ * e.printStackTrace();
+ * }
+ * }
+ *
+ * @Override
+ * public void close() throws IOException {
+ * // no-op
+ * }
+ * }
+ * </pre>
+ *
+ * A little bit more explanation on the PIG {@link Tuple} objects. A Tuple is an
+ * ordered list of PIG datum objects. The permitted PIG datum types can be
+ * categorized as Scalar types and Composite types.
+ * <p>
+ * Supported Scalar types include seven native Java types: Boolean, Byte,
+ * Integer, Long, Float, Double, String, as well as one PIG class called
+ * {@link DataByteArray} that represents type-less byte array.
+ * <p>
+ * Supported Composite types include:
+ * <UL>
+ * <LI>{@link Map} : It is the same as Java Map class, with the additional
+ * restriction that the key-type must be one of the scalar types PIG recognizes,
+ * and the value-type any of the scaler or composite types PIG understands.
+ * <LI>{@link DataBag} : A DataBag is a collection of Tuples.
+ * <LI>{@link Tuple} : Yes, Tuple itself can be a datum in another Tuple.
+ * </UL>
+ */
+public class TableInputFormat extends InputFormat<BytesWritable, Tuple> {
+ static Log LOG = LogFactory.getLog(TableInputFormat.class);
+
+ private static final String INPUT_EXPR = "mapreduce.lib.table.input.expr";
+ private static final String INPUT_PROJ = "mapreduce.lib.table.input.projection";
+ private static final String INPUT_SORT = "mapreduce.lib.table.input.sort";
+
+ /**
+ * Set the paths to the input table.
+ *
+ * @param conf
+ * JobContext object.
+ * @param paths
+ * one or more paths to BasicTables. The InputFormat class will
+ * produce splits on the "union" of these BasicTables.
+ */
+ public static void setInputPaths(JobContext jobContext, Path... paths) {
+ if (paths.length < 1) {
+ throw new IllegalArgumentException("Requring at least one input path");
+ }
+
+ Configuration conf = jobContext.getConfiguration();
+
+ if (paths.length == 1) {
+ setInputExpr(conf, new BasicTableExpr(paths[0]));
+ }
+ else {
+ TableUnionExpr expr = new TableUnionExpr();
+ for (Path path : paths) {
+ expr.add(new BasicTableExpr(path));
+ }
+ setInputExpr(conf, expr);
+ }
+ }
+
+ /**
+ * Set the input expression in the Configuration object.
+ *
+ * @param conf
+ * Configuration object.
+ * @param expr
+ * The input table expression.
+ */
+ static void setInputExpr(Configuration conf, TableExpr expr) {
+ StringBuilder out = new StringBuilder();
+ expr.encode(out);
+ conf.set(INPUT_EXPR, out.toString());
+ }
+
+ static TableExpr getInputExpr(JobContext jobContext) throws IOException {
+ Configuration conf = jobContext.getConfiguration();
+ String expr = conf.get(INPUT_EXPR);
+ if (expr == null) {
+ // try setting from input path
+ Path[] paths = FileInputFormat.getInputPaths( jobContext );
+ if (paths != null) {
+ setInputPaths(jobContext, paths);
+ }
+ expr = conf.get(INPUT_EXPR);
+ }
+
+ if (expr == null) {
+ throw new IllegalArgumentException("Input expression not defined.");
+ }
+ StringReader in = new StringReader(expr);
+ return TableExpr.parse(in);
+ }
+
+ /**
+ * Get the schema of a table expr
+ *
+ * @param jobContext
+ * JobContext object.
+ *
+ */
+ public static Schema getSchema(JobContext jobContext) throws IOException
+ {
+ TableExpr expr = getInputExpr( jobContext );
+ return expr.getSchema( jobContext.getConfiguration() );
+ }
+
+ /**
+ * Set the input projection in the JobContext object.
+ *
+ * @param jobContext
+ * JobContext object.
+ * @param projection
+ * A common separated list of column names. If we want select all
+ * columns, pass projection==null. The syntax of the projection
+ * conforms to the {@link Schema} string.
+ * @deprecated Use {@link #setProjection(JobContext, ZebraProjection)} instead.
+ */
+ public static void setProjection(JobContext jobContext, String projection) throws ParseException {
+ setProjection( jobContext.getConfiguration(), projection );
+ }
+
+ /**
+ * Set the input projection in the JobContext object.
+ *
+ * @param conf
+ * Configuration object.
+ * @param projection
+ * A common separated list of column names. If we want select all
+ * columns, pass projection==null. The syntax of the projection
+ * conforms to the {@link Schema} string.
+ */
+ private static void setProjection(Configuration conf, String projection) throws ParseException {
+ conf.set(INPUT_PROJ, Schema.normalize(projection));
+
+ // virtual source_table columns require sorted table
+ if (Projection.getVirtualColumnIndices(projection) != null && !getSorted( conf ))
+ throw new ParseException("The source_table virtual column is only availabe for sorted table unions.");
+ }
+
+ /**
+ * Set the input projection in the JobContext object.
+ *
+ * @param jobContext
+ * JobContext object.
+ * @param projection
+ * A common separated list of column names. If we want select all
+ * columns, pass projection==null. The syntax of the projection
+ * conforms to the {@link Schema} string.
+ *
+ */
+ public static void setProjection(JobContext jobContext, ZebraProjection projection) throws ParseException {
+ /* validity check on projection */
+ Schema schema = null;
+ String normalizedProjectionString = Schema.normalize(projection.toString());
+ try {
+ schema = getSchema( jobContext );
+ new org.apache.hadoop.zebra.types.Projection(schema, normalizedProjectionString);
+ } catch (ParseException e) {
+ throw new ParseException("[" + projection + "] " + "is not a valid Zebra projection string " + e.getMessage());
+ } catch (IOException e) {
+ throw new ParseException("[" + projection + "] " + "is not a valid Zebra projection string " + e.getMessage());
+ }
+
+ Configuration conf = jobContext.getConfiguration();
+ conf.set(INPUT_PROJ, normalizedProjectionString);
+
+ // virtual source_table columns require sorted table
+ if (Projection.getVirtualColumnIndices(projection.toString()) != null && !getSorted( conf ))
+ throw new ParseException("The source_table virtual column is only availabe for sorted table unions.");
+ }
+
+ /**
+ * Get the projection from the JobContext
+ *
+ * @param jobContext
+ * The JobContext object
+ * @return The projection schema. If projection has not been defined, or is
+ * not known at this time, null will be returned. Note that by the time
+ * when this method is called in Mapper code, the projection must
+ * already be known.
+ * @throws IOException
+ *
+ */
+ public static String getProjection(JobContext jobContext) throws IOException, ParseException {
+ Configuration conf = jobContext.getConfiguration();
+ String strProj = conf.get(INPUT_PROJ);
+ // TODO: need to be revisited
+ if (strProj != null) return strProj;
+ TableExpr expr = getInputExpr( jobContext );
+ if (expr != null) {
+ return expr.getSchema(conf).toProjectionString();
+ }
+ return null;
+ }
+
+ /**
+ * Set requirement for sorted table
+ *
+ *@param conf
+ * Configuration object.
+ */
+ private static void setSorted(Configuration conf) {
+ conf.setBoolean(INPUT_SORT, true);
+ }
+
+ /**
+ * Get the SortInfo object regarding a Zebra table
+ *
+ * @param jobContext
+ * JobContext object
+ * @return the zebra tables's SortInfo; null if the table is unsorted.
+ */
+ public static SortInfo getSortInfo(JobContext jobContext) throws IOException
+ {
+ Configuration conf = jobContext.getConfiguration();
+ TableExpr expr = getInputExpr(jobContext);
+ SortInfo result = null;
+ int sortSize = 0;
+ if (expr instanceof BasicTableExpr)
+ {
+ BasicTable.Reader reader = new BasicTable.Reader(((BasicTableExpr) expr).getPath(), conf);
+ SortInfo sortInfo = reader.getSortInfo();
+ reader.close();
+ result = sortInfo;
+ } else {
+ List<LeafTableInfo> leaves = expr.getLeafTables(null);
+ for (Iterator<LeafTableInfo> it = leaves.iterator(); it.hasNext(); )
+ {
+ LeafTableInfo leaf = it.next();
+ BasicTable.Reader reader = new BasicTable.Reader(leaf.getPath(), conf);
+ SortInfo sortInfo = reader.getSortInfo();
+ reader.close();
+ if (sortSize == 0)
+ {
+ sortSize = sortInfo.size();
+ result = sortInfo;
+ } else if (sortSize != sortInfo.size()) {
+ throw new IOException("Tables of the table union do not possess the same sort property.");
+ }
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Requires sorted table or table union
+ *
+ * @param jobContext
+ * JobContext object.
+ * @param sortInfo
+ * ZebraSortInfo object containing sorting information.
+ *
+ */
+ public static void requireSortedTable(JobContext jobContext, ZebraSortInfo sortInfo) throws IOException {
+ TableExpr expr = getInputExpr( jobContext );
+ Configuration conf = jobContext.getConfiguration();
+ String comparatorName = null;
+ String[] sortcolumns = null;
+ if (sortInfo != null)
+ {
+ comparatorName = TFile.COMPARATOR_JCLASS+sortInfo.getComparator();
+ String sortColumnNames = sortInfo.getSortColumns();
+ if (sortColumnNames != null)
+ sortcolumns = sortColumnNames.trim().split(SortInfo.SORTED_COLUMN_DELIMITER);
+ if (sortcolumns == null)
+ throw new IllegalArgumentException("No sort columns specified.");
+ }
+
+ if (expr instanceof BasicTableExpr)
+ {
+ BasicTable.Reader reader = new BasicTable.Reader(((BasicTableExpr) expr).getPath(), conf);
+ SortInfo mySortInfo = reader.getSortInfo();
+
+ reader.close();
+ if (mySortInfo == null)
+ throw new IOException("The table is not sorted");
+ if (comparatorName == null)
+ // cheat the equals method's comparator comparison
+ comparatorName = mySortInfo.getComparator();
+ if (sortcolumns != null && !mySortInfo.equals(sortcolumns, comparatorName))
+ {
+ throw new IOException("The table is not properly sorted");
+ }
+ setSorted( conf );
+ } else {
+ List<LeafTableInfo> leaves = expr.getLeafTables(null);
+ for (Iterator<LeafTableInfo> it = leaves.iterator(); it.hasNext(); )
+ {
+ LeafTableInfo leaf = it.next();
+ BasicTable.Reader reader = new BasicTable.Reader(leaf.getPath(), conf);
+ SortInfo mySortInfo = reader.getSortInfo();
+ reader.close();
+ if (mySortInfo == null)
+ throw new IOException("The table is not sorted");
+ if (comparatorName == null)
+ comparatorName = mySortInfo.getComparator(); // use the first table's comparator as comparison base
+ if (sortcolumns == null)
+ {
+ sortcolumns = mySortInfo.getSortColumnNames();
+ comparatorName = mySortInfo.getComparator();
+ } else {
+ if (!mySortInfo.equals(sortcolumns, comparatorName))
+ {
+ throw new IOException("The table is not properly sorted");
+ }
+ }
+ }
+ // need key range input splits for sorted table union
+ setSorted( conf );
+ }
+ }
+
+ /**
+ * Get requirement for sorted table
+ *
+ *@param jobContext JobContext object.
+ */
+ private static boolean getSorted(Configuration conf) {
+ return conf.getBoolean(INPUT_SORT, false);
+ }
+
+ /**
+ * @see InputFormat#createRecordReader(InputSplit, TaskAttemptContext)
+ */
+ @Override
+ public RecordReader<BytesWritable, Tuple> createRecordReader(InputSplit split,
+ TaskAttemptContext taContext) throws IOException,InterruptedException {
+ return createRecordReaderFromJobContext( split, taContext );
+ }
+
+ private RecordReader<BytesWritable, Tuple> createRecordReaderFromJobContext(InputSplit split,
+ JobContext jobContext) throws IOException,InterruptedException {
+ Configuration conf = jobContext.getConfiguration();
+
+ TableExpr expr = getInputExpr( conf );
+ if (expr == null) {
+ throw new IOException("Table expression not defined");
+ }
+
+ if ( getSorted( conf ) )
+ expr.setSortedSplit();
+
+ String strProj = conf.get(INPUT_PROJ);
+ String projection = null;
+ try {
+ if (strProj == null) {
+ projection = expr.getSchema(conf).toProjectionString();
+ ZebraProjection proj = ZebraProjection.createZebraProjection( projection );
+ TableInputFormat.setProjection( jobContext, proj );
+ } else {
+ projection = strProj;
+ }
+ } catch (ParseException e) {
+ throw new IOException("Projection parsing failed : "+e.getMessage());
+ }
+
+ try {
+ return new TableRecordReader(expr, projection, split, jobContext);
+ } catch (ParseException e) {
+ throw new IOException("Projection parsing faile : "+e.getMessage());
+ }
+ }
+
+ private static TableExpr getInputExpr(Configuration conf) throws IOException {
+ String expr = conf.get(INPUT_EXPR);
+ if (expr == null) {
+ throw new IllegalArgumentException("Input expression not defined.");
+ }
+ StringReader in = new StringReader(expr);
+ return TableExpr.parse(in);
+
+ }
+
+ /**
+ * Get a TableRecordReader on a single split
+ *
+ * @param jobContext
+ * JobContext object.
+ * @param projection
+ * comma-separated column names in projection. null means all columns in projection
+ */
+ public static TableRecordReader createTableRecordReader(JobContext jobContext, String projection)
+ throws IOException, ParseException, InterruptedException {
+ if (projection != null)
+ setProjection( jobContext, ZebraProjection.createZebraProjection( projection ) );
+ TableInputFormat inputFormat = new TableInputFormat();
+
+ // a single split is needed
+ List<InputSplit> splits = inputFormat.getSplits( jobContext, true );
+ if( splits.size() != 1 )
+ throw new IOException("Unable to generated one single split for the sorted table (internal error)" );
+ return (TableRecordReader)inputFormat.createRecordReaderFromJobContext( splits.get( 0 ), jobContext );
+ }
+
+ private static List<InputSplit> getSortedSplits(Configuration conf, int numSplits,
+ TableExpr expr, List<BasicTable.Reader> readers,
+ List<BasicTableStatus> status) throws IOException {
+ if (expr.sortedSplitRequired() && !expr.sortedSplitCapable()) {
+ throw new IOException("Unable to created sorted splits");
+ }
+
+ long totalBytes = 0;
+ for (Iterator<BasicTableStatus> it = status.iterator(); it.hasNext();) {
+ BasicTableStatus s = it.next();
+ totalBytes += s.getSize();
+ }
+
+ long maxSplits = totalBytes / getMinSplitSize( conf );
+ List<InputSplit> splits = new ArrayList<InputSplit>();
+ if (maxSplits == 0)
+ numSplits = 1;
+ else if (numSplits > maxSplits) {
+ numSplits = -1;
+ }
+
+ for (Iterator<BasicTable.Reader> it = readers.iterator(); it.hasNext();) {
+ BasicTable.Reader reader = it.next();
+ if (!reader.isSorted()) {
+ throw new IOException("Attempting sorted split on unsorted table");
+ }
+ }
+
+ if (numSplits == 1) {
+ BlockDistribution bd = null;
+ for (Iterator<BasicTable.Reader> it = readers.iterator(); it.hasNext();) {
+ BasicTable.Reader reader = it.next();
+ bd = BlockDistribution.sum(bd, reader.getBlockDistribution((RangeSplit)null));
+ }
+
+ SortedTableSplit split = new SortedTableSplit(null, null, bd, conf);
+ splits.add(split);
+ return splits;
+ }
+
+ // TODO: Does it make sense to interleave keys for all leaf tables if
+ // numSplits <= 0 ?
+ int nLeaves = readers.size();
+ KeyDistribution keyDistri = null;
+ for (int i = 0; i < nLeaves; ++i) {
+ KeyDistribution btKeyDistri =
+ readers.get(i).getKeyDistribution(
+ (numSplits <= 0) ? -1 :
+ Math.max(numSplits * 5 / nLeaves, numSplits));
+ keyDistri = KeyDistribution.sum(keyDistri, btKeyDistri);
+ }
+
+ if (keyDistri == null) {
+ // should never happen.
+ return splits;
+ }
+
+ if (numSplits > 0) {
+ keyDistri.resize(numSplits);
+ }
+
+ RawComparable[] rawKeys = keyDistri.getKeys();
+ BytesWritable[] keys = new BytesWritable[rawKeys.length];
+ for (int i=0; i<keys.length; ++i) {
+ RawComparable rawKey = rawKeys[i];
+ keys[i] = new BytesWritable();
+ keys[i].setSize(rawKey.size());
+ System.arraycopy(rawKey.buffer(), rawKey.offset(), keys[i].getBytes(), 0,
+ rawKey.size());
+ }
+
+ // TODO: Should we change to RawComparable to avoid the creation of
+ // BytesWritables?
+ for (int i = 0; i < keys.length; ++i) {
+ BytesWritable begin = (i == 0) ? null : keys[i - 1];
+ BytesWritable end = (i == keys.length - 1) ? null : keys[i];
+ BlockDistribution bd = keyDistri.getBlockDistribution(keys[i]);
+ SortedTableSplit split = new SortedTableSplit(begin, end, bd, conf);
+ splits.add(split);
+ }
+ return splits;
+ }
+
+ static long getMinSplitSize(Configuration conf) {
+ return conf.getLong("table.input.split.minSize", 1 * 1024 * 1024L);
+ }
+
+ /**
+ * Set the minimum split size.
+ *
+ * @param jobContext
+ * The job conf object.
+ * @param minSize
+ * Minimum size.
+ */
+ public static void setMinSplitSize(JobContext jobContext, long minSize) {
+ jobContext.getConfiguration().setLong("table.input.split.minSize", minSize);
+ }
+
+ private static class DummyFileInputFormat extends FileInputFormat<BytesWritable, Tuple> {
+ public DummyFileInputFormat(Job job, long minSplitSize) {
+ super.setMinInputSplitSize( job, minSplitSize );
+ }
+
+
+ @Override
+ public RecordReader<BytesWritable, Tuple> createRecordReader(
+ InputSplit arg0, TaskAttemptContext arg1) throws IOException,
+ InterruptedException {
+ // no-op
+ return null;
+ }
+ }
+
+ private static List<InputSplit> getRowSplits(Configuration conf,
+ TableExpr expr, List<BasicTable.Reader> readers,
+ List<BasicTableStatus> status) throws IOException {
+ ArrayList<InputSplit> ret = new ArrayList<InputSplit>();
+ Job job = new Job(conf);
+ DummyFileInputFormat helper = new DummyFileInputFormat(job, getMinSplitSize(conf));
+
+ for (int i = 0; i < readers.size(); ++i) {
+ BasicTable.Reader reader = readers.get(i);
+ /* Get the index of the column group that will be used for row-split.*/
+ int splitCGIndex = reader.getRowSplitCGIndex();
+
+ /* We can create input splits only if there does exist a valid column group for split.
+ * Otherwise, we do not create input splits. */
+ if (splitCGIndex >= 0) {
+ Path path = new Path (reader.getPath().toString() + "/" + reader.getName(splitCGIndex));
+ DummyFileInputFormat.setInputPaths(job, path);
+ PathFilter filter = reader.getPathFilter( job.getConfiguration() );
+ DummyFileInputFormat.setInputPathFilter(job, filter.getClass());
+
+ List<InputSplit> inputSplits = helper.getSplits( job );
+
+ long starts[] = new long[inputSplits.size()];
+ long lengths[] = new long[inputSplits.size()];
+ Path paths[] = new Path [inputSplits.size()];
+ for (int j=0; j<inputSplits.size(); j++) {
+ FileSplit fileSplit = (FileSplit) inputSplits.get( j );
+ Path p = fileSplit.getPath();
+ long start = fileSplit.getStart();
+ long length = fileSplit.getLength();
+
+ starts[j] = start;
+ lengths[j] = length;
+ paths[j] = p;
+ }
+
+ List<RowSplit> subSplits = reader.rowSplit(starts, lengths, paths, splitCGIndex);
+
+ for (Iterator<RowSplit> it = subSplits.iterator(); it.hasNext();) {
+ RowSplit subSplit = it.next();
+ RowTableSplit split = new RowTableSplit(reader, subSplit, conf);
+ ret.add(split);
+ }
+ }
+ }
+
+ LOG.info("getSplits : returning " + ret.size() + " row splits.");
+ return ret;
+ }
+
+ /**
+ * @see InputFormat#getSplits(JobContext)
+ */
+ @Override
+ public List<InputSplit> getSplits(JobContext jobContext) throws IOException {
+ return getSplits( jobContext, false );
+ }
+
+ private List<InputSplit> getSplits(JobContext jobContext, boolean singleSplit) throws IOException {
+ Configuration conf = jobContext.getConfiguration();
+ TableExpr expr = getInputExpr( conf );
+ if( getSorted(conf) ) {
+ expr.setSortedSplit();
+ } else if( singleSplit ) {
+ throw new IOException( "Table must be sorted in order to get a single sorted split" );
+ }
+
+ if( expr.sortedSplitRequired() && !expr.sortedSplitCapable() ) {
+ throw new IOException( "Unable to created sorted splits" );
+ }
+
+ String projection;
+ try {
+ projection = getProjection(jobContext);
+ } catch (ParseException e) {
+ throw new IOException("getProjection failed : "+e.getMessage());
+ }
+ List<LeafTableInfo> leaves = expr.getLeafTables(projection);
+ int nLeaves = leaves.size();
+ ArrayList<BasicTable.Reader> readers =
+ new ArrayList<BasicTable.Reader>(nLeaves);
+ ArrayList<BasicTableStatus> status =
+ new ArrayList<BasicTableStatus>(nLeaves);
+
+ try {
+ for (Iterator<LeafTableInfo> it = leaves.iterator(); it.hasNext();) {
+ LeafTableInfo leaf = it.next();
+ BasicTable.Reader reader =
+ new BasicTable.Reader(leaf.getPath(), conf );
+ reader.setProjection(leaf.getProjection());
+ BasicTableStatus s = reader.getStatus();
+ readers.add(reader);
+ status.add(s);
+ }
+
+ if( readers.isEmpty() ) {
+ // I think we should throw exception here.
+ return new ArrayList<InputSplit>();
+ }
+
+ return expr.sortedSplitRequired() ?
+ singleSplit ? getSortedSplits( conf, 1, expr, readers, status ) : getSortedSplits(conf, -1, expr, readers, status) :
+ getRowSplits( conf, expr, readers, status );
+ } catch (ParseException e) {
+ throw new IOException("Projection parsing failed : "+e.getMessage());
+ } finally {
+ for (Iterator<BasicTable.Reader> it = readers.iterator(); it.hasNext();) {
+ try {
+ it.next().close();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ // TODO: log the error here.
+ }
+ }
+ }
+ }
+
+ @Deprecated
+ public synchronized void validateInput(JobContext jobContext) throws IOException {
+ // Validating imports by opening all Tables.
+ TableExpr expr = getInputExpr(jobContext);
+ try {
+ String projection = getProjection(jobContext);
+ List<LeafTableInfo> leaves = expr.getLeafTables(projection);
+ Iterator<LeafTableInfo> iterator = leaves.iterator();
+ while (iterator.hasNext()) {
+ LeafTableInfo leaf = iterator.next();
+ BasicTable.Reader reader =
+ new BasicTable.Reader(leaf.getPath(), jobContext.getConfiguration());
+ reader.setProjection(projection);
+ reader.close();
+ }
+ } catch (ParseException e) {
+ throw new IOException("Projection parsing failed : "+e.getMessage());
+ }
+ }
+}
+
+/**
+ * Adaptor class for sorted InputSplit for table.
+ */
+class SortedTableSplit extends InputSplit implements Writable {
+
+ BytesWritable begin = null, end = null;
+
+ String[] hosts;
+ long length = 1;
+
+ public SortedTableSplit()
+ {
+ // no-op for Writable construction
+ }
+
+ public SortedTableSplit(BytesWritable begin, BytesWritable end,
+ BlockDistribution bd, Configuration conf) {
+ if (begin != null) {
+ this.begin = new BytesWritable();
+ this.begin.set(begin.getBytes(), 0, begin.getLength());
+ }
+ if (end != null) {
+ this.end = new BytesWritable();
+ this.end.set(end.getBytes(), 0, end.getLength());
+ }
+
+ if (bd != null) {
+ length = bd.getLength();
+ hosts =
+ bd.getHosts( conf.getInt("mapred.lib.table.input.nlocation", 5) );
+ }
+ }
+
+ @Override
+ public long getLength() throws IOException {
+ return length;
+ }
+
+ @Override
+ public String[] getLocations() throws IOException {
+ if (hosts == null)
+ {
+ String[] tmp = new String[1];
+ tmp[0] = "";
+ return tmp;
+ }
+ return hosts;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ begin = end = null;
+ int bool = WritableUtils.readVInt(in);
+ if (bool == 1) {
+ begin = new BytesWritable();
+ begin.readFields(in);
+ }
+ bool = WritableUtils.readVInt(in);
+ if (bool == 1) {
+ end = new BytesWritable();
+ end.readFields(in);
+ }
+ length = WritableUtils.readVLong(in);
+ int size = WritableUtils.readVInt(in);
+ if (size > 0)
+ hosts = new String[size];
+ for (int i = 0; i < size; i++)
+ hosts[i] = WritableUtils.readString(in);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ if (begin == null) {
+ WritableUtils.writeVInt(out, 0);
+ }
+ else {
+ WritableUtils.writeVInt(out, 1);
+ begin.write(out);
+ }
+ if (end == null) {
+ WritableUtils.writeVInt(out, 0);
+ }
+ else {
+ WritableUtils.writeVInt(out, 1);
+ end.write(out);
+ }
+ WritableUtils.writeVLong(out, length);
+ WritableUtils.writeVInt(out, hosts == null ? 0 : hosts.length);
+ for (int i = 0; i < hosts.length; i++)
+ {
+ WritableUtils.writeString(out, hosts[i]);
+ }
+ }
+
+ public BytesWritable getBegin() {
+ return begin;
+ }
+
+ public BytesWritable getEnd() {
+ return end;
+ }
+}
+
+/**
+ * Adaptor class for unsorted InputSplit for table.
+ */
+class UnsortedTableSplit extends InputSplit implements Writable {
+ String path = null;
+ RangeSplit split = null;
+ String[] hosts = null;
+ long length = 1;
+
+ public UnsortedTableSplit(Reader reader, RangeSplit split, Configuration conf)
+ throws IOException {
+ this.path = reader.getPath();
+ this.split = split;
+ BlockDistribution dataDist = reader.getBlockDistribution(split);
+ if (dataDist != null) {
+ length = dataDist.getLength();
+ hosts =
+ dataDist.getHosts(conf.getInt("mapred.lib.table.input.nlocation", 5));
+ }
+ }
+
+ public UnsortedTableSplit() {
+ // no-op for Writable construction
+ }
+
+ @Override
+ public long getLength() throws IOException {
+ return length;
+ }
+
+ @Override
+ public String[] getLocations() throws IOException {
+ if (hosts == null)
+ {
+ String[] tmp = new String[1];
+ tmp[0] = "";
+ return tmp;
+ }
+ return hosts;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ path = WritableUtils.readString(in);
+ int bool = WritableUtils.readVInt(in);
+ if (bool == 1) {
+ if (split == null) split = new RangeSplit();
+ split.readFields(in);
+ }
+ else {
+ split = null;
+ }
+ hosts = WritableUtils.readStringArray(in);
+ length = WritableUtils.readVLong(in);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ WritableUtils.writeString(out, path);
+ if (split == null) {
+ WritableUtils.writeVInt(out, 0);
+ }
+ else {
+ WritableUtils.writeVInt(out, 1);
+ split.write(out);
+ }
+ WritableUtils.writeStringArray(out, hosts);
+ WritableUtils.writeVLong(out, length);
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public RangeSplit getSplit() {
+ return split;
+ }
+}
+
+/**
+ * Adaptor class for unsorted InputSplit for table.
+ */
+class RowTableSplit extends InputSplit implements Writable{
+ /**
+ *
+ */
+String path = null;
+ RowSplit split = null;
+ String[] hosts = null;
+ long length = 1;
+
+ public RowTableSplit(Reader reader, RowSplit split, Configuration conf)
+ throws IOException {
+ this.path = reader.getPath();
+ this.split = split;
+ BlockDistribution dataDist = reader.getBlockDistribution(split);
+ if (dataDist != null) {
+ length = dataDist.getLength();
+ hosts =
+ dataDist.getHosts(conf.getInt("mapred.lib.table.input.nlocation", 5));
+ }
+ }
+
+ public RowTableSplit() {
+ // no-op for Writable construction
+ }
+
+ @Override
+ public long getLength() throws IOException {
+ return length;
+ }
+
+ @Override
+ public String[] getLocations() throws IOException {
+ return hosts;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ path = WritableUtils.readString(in);
+ int bool = WritableUtils.readVInt(in);
+ if (bool == 1) {
+ if (split == null) split = new RowSplit();
+ split.readFields(in);
+ }
+ else {
+ split = null;
+ }
+ hosts = WritableUtils.readStringArray(in);
+ length = WritableUtils.readVLong(in);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ WritableUtils.writeString(out, path);
+ if (split == null) {
+ WritableUtils.writeVInt(out, 0);
+ }
+ else {
+ WritableUtils.writeVInt(out, 1);
+ split.write(out);
+ }
+ WritableUtils.writeStringArray(out, hosts);
+ WritableUtils.writeVLong(out, length);
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public RowSplit getSplit() {
+ return split;
+ }
+}
Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java?rev=909667&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java Sat Feb 13 00:06:15 2010
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.zebra.io.TableScanner;
+import org.apache.hadoop.zebra.parser.ParseException;
+import org.apache.hadoop.zebra.types.Projection;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.data.Tuple;
+
+/**
+ * Adaptor class to implement RecordReader on top of Scanner.
+ */
+public class TableRecordReader extends RecordReader<BytesWritable, Tuple> {
+ private final TableScanner scanner;
+ private long count = 0;
+ private BytesWritable key = null;
+ private Tuple value = null;
+
+ /**
+ *
+ * @param expr
+ * Table expression
+ * @param projection
+ * projection schema. Should never be null.
+ * @param split
+ * the split to work on
+ * @param jobContext
+ * JobContext object
+ * @throws IOException
+ */
+ public TableRecordReader(TableExpr expr, String projection,
+ InputSplit split, JobContext jobContext) throws IOException, ParseException {
+ Configuration conf = jobContext.getConfiguration();
+ if (expr.sortedSplitRequired()) {
+ SortedTableSplit tblSplit = (SortedTableSplit) split;
+ scanner =
+ expr.getScanner(tblSplit.getBegin(), tblSplit.getEnd(), projection,
+ conf);
+ } else if (split != null && split instanceof RowTableSplit) {
+ RowTableSplit rowSplit = (RowTableSplit) split;
+ scanner = expr.getScanner(rowSplit, projection, conf);
+ }
+ else {
+ UnsortedTableSplit tblSplit = (UnsortedTableSplit) split;
+ scanner = expr.getScanner(tblSplit, projection, conf);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ scanner.close();
+ }
+
+ public long getPos() throws IOException {
+ return count;
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ return (float)((scanner.atEnd()) ? 1.0 : 0);
+ }
+ /**
+ * Seek to the position at the first row which has the key
+ * or just after the key; only applicable for sorted Zebra table
+ *
+ * @param key
+ * the key to seek on
+ */
+ public boolean seekTo(BytesWritable key) throws IOException {
+ return scanner.seekTo(key);
+ }
+
+ /**
+ * Check if the end of the input has been reached
+ *
+ * @return true if the end of the input is reached
+ */
+ public boolean atEnd() throws IOException {
+ return scanner.atEnd();
+ }
+
+ @Override
+ public BytesWritable getCurrentKey() throws IOException, InterruptedException {
+ return key;
+ }
+
+ @Override
+ public Tuple getCurrentValue() throws IOException, InterruptedException {
+ return value;
+ }
+
+ @Override
+ public void initialize(org.apache.hadoop.mapreduce.InputSplit arg0,
+ TaskAttemptContext arg1) throws IOException, InterruptedException {
+ // no-op
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ if( scanner.atEnd() ) {
+ key = null;
+ value = null;
+ return false;
+ }
+
+ if( key == null ) {
+ key = new BytesWritable();
+ }
+ if (value == null) {
+ value = TypesUtils.createTuple(Projection.getNumColumns(scanner.getProjection()));
+ }
+ scanner.getKey(key);
+ scanner.getValue(value);
+ scanner.advance();
+ count++;
+ return true;
+ }
+}
Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableUnionExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableUnionExpr.java?rev=909667&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableUnionExpr.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableUnionExpr.java Sat Feb 13 00:06:15 2010
@@ -0,0 +1,294 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.mapreduce;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.concurrent.PriorityBlockingQueue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.BasicTableStatus;
+import org.apache.hadoop.zebra.io.TableScanner;
+import org.apache.hadoop.zebra.parser.ParseException;
+import org.apache.hadoop.zebra.types.Projection;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.pig.data.Tuple;
+
+/**
+ * Table expression supporting a union of BasicTables.
+ *
+ * @see <a href="doc-files/examples/ReadTableUnion.java">Usage example for
+ * UnionTableExpr</a>
+ */
+class TableUnionExpr extends CompositeTableExpr {
+ /**
+ * Add another BasicTable into the table-union.
+ *
+ * @param expr
+ * The expression for the BasicTable to be added.
+ * @return self.
+ */
+ public TableUnionExpr add(BasicTableExpr expr) {
+ super.addCompositeTable(expr);
+ return this;
+ }
+
+ /**
+ * Add an array of BasicTables into the table-union.
+ *
+ * @param exprs
+ * the expressions representing the BasicTables to be added.
+ * @return self.
+ */
+ public TableUnionExpr add(BasicTableExpr[] exprs) {
+ super.addCompositeTables(exprs);
+ return this;
+ }
+
+ /**
+ * Add a Collection of BasicTables into the table-union.
+ *
+ * @param exprs
+ * the expressions representing the BasicTables to be added.
+ * @return self.
+ */
+ public TableUnionExpr add(Collection<? extends BasicTableExpr> exprs) {
+ super.addCompositeTables(exprs);
+ return this;
+ }
+
+ @Override
+ protected TableUnionExpr decodeParam(StringReader in) throws IOException {
+ super.decodeParam(in);
+ int n = composite.size();
+ for (int i = 0; i < n; ++i) {
+ if (!(composite.get(i) instanceof BasicTableExpr)) {
+ throw new RuntimeException("Not a BasicTableExpr");
+ }
+ }
+ return this;
+ }
+
+ @Override
+ protected TableUnionExpr encodeParam(StringBuilder out) {
+ super.encodeParam(out);
+ return this;
+ }
+
+ @Override
+ public TableScanner getScanner(BytesWritable begin, BytesWritable end,
+ String projection, Configuration conf) throws IOException {
+ int n = composite.size();
+ if (n==0) {
+ throw new IllegalArgumentException("Union of 0 table");
+ }
+ ArrayList<BasicTable.Reader> readers = new ArrayList<BasicTable.Reader>(n);
+ final ArrayList<BasicTableStatus> status =
+ new ArrayList<BasicTableStatus>(n);
+ for (int i = 0; i < n; ++i) {
+ BasicTableExpr expr = (BasicTableExpr) composite.get(i);
+ BasicTable.Reader reader =
+ new BasicTable.Reader(expr.getPath(), conf);
+ readers.add(reader);
+ status.add(reader.getStatus());
+ }
+
+ String actualProjection = projection;
+ if (actualProjection == null) {
+ // Perform a union on all column names.
+ LinkedHashSet<String> colNameSet = new LinkedHashSet<String>();
+ for (int i = 0; i < n; ++i) {
+ String[] cols = readers.get(i).getSchema().getColumns();
+ for (String col : cols) {
+ colNameSet.add(col);
+ }
+ }
+
+ actualProjection =
+ Projection.getProjectionStr(colNameSet.toArray(new String[colNameSet.size()]));
+ }
+
+ ArrayList<TableScanner> scanners = new ArrayList<TableScanner>(n);
+ try {
+ for (int i=0; i<n; ++i) {
+ BasicTable.Reader reader = readers.get(i);
+ reader.setProjection(actualProjection);
+ TableScanner scanner = readers.get(i).getScanner(begin, end, true);
+ scanners.add(scanner);
+ }
+ } catch (ParseException e) {
+ throw new IOException("Projection parsing failed : "+e.getMessage());
+ }
+
+ if (scanners.isEmpty()) {
+ return new NullScanner(actualProjection);
+ }
+
+ Integer[] virtualColumnIndices = Projection.getVirtualColumnIndices(projection);
+ if (virtualColumnIndices != null && n == 1)
+ throw new IllegalArgumentException("virtual column requires union of multiple tables");
+ return new SortedTableUnionScanner(scanners, Projection.getVirtualColumnIndices(projection));
+ }
+}
+
+/**
+ * Union scanner.
+ */
+class SortedTableUnionScanner implements TableScanner {
+ CachedTableScanner[] scanners;
+ PriorityBlockingQueue<CachedTableScanner> queue;
+ boolean synced = false;
+ boolean hasVirtualColumns = false;
+ Integer[] virtualColumnIndices = null;
+ CachedTableScanner scanner = null; // the working scanner
+
+ SortedTableUnionScanner(List<TableScanner> scanners, Integer[] vcolindices) throws IOException {
+ if (scanners.isEmpty()) {
+ throw new IllegalArgumentException("Zero-sized table union");
+ }
+
+ this.scanners = new CachedTableScanner[scanners.size()];
+ queue =
+ new PriorityBlockingQueue<CachedTableScanner>(scanners.size(),
+ new Comparator<CachedTableScanner>() {
+
+ @Override
+ public int compare(CachedTableScanner o1, CachedTableScanner o2) {
+ try {
+ return o1.getKey().compareTo(o2.getKey());
+ }
+ catch (IOException e) {
+ throw new RuntimeException("IOException: " + e.toString());
+ }
+ }
+
+ });
+
+ for (int i = 0; i < this.scanners.length; ++i) {
+ TableScanner scanner = scanners.get(i);
+ this.scanners[i] = new CachedTableScanner(scanner, i);
+ }
+ // initial fill-ins
+ if (!atEnd())
+ scanner = queue.poll();
+ virtualColumnIndices = vcolindices;
+ hasVirtualColumns = (vcolindices != null && vcolindices.length != 0);
+ }
+
+
+ private void sync() throws IOException {
+ if (synced == false) {
+ queue.clear();
+ for (int i = 0; i < scanners.length; ++i) {
+ if (!scanners[i].atEnd()) {
+ queue.add(scanners[i]);
+ }
+ }
+ synced = true;
+ }
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ sync();
+ scanner.advance();
+ if (!scanner.atEnd()) {
+ queue.add(scanner);
+ }
+ scanner = queue.poll();
+ return (scanner != null);
+ }
+
+ @Override
+ public boolean atEnd() throws IOException {
+ sync();
+ return (scanner == null && queue.isEmpty());
+ }
+
+ @Override
+ public String getProjection() {
+ return scanners[0].getProjection();
+ }
+
+ @Override
+ public Schema getSchema() {
+ return scanners[0].getSchema();
+ }
+
+ @Override
+ public void getKey(BytesWritable key) throws IOException {
+ if (atEnd()) {
+ throw new EOFException("No more rows to read");
+ }
+
+ key.set(scanner.getKey());
+ }
+
+ @Override
+ public void getValue(Tuple row) throws IOException {
+ if (atEnd()) {
+ throw new EOFException("No more rows to read");
+ }
+
+ Tuple tmp = scanner.getValue();
+ if (hasVirtualColumns)
+ {
+ for (int i = 0; i < virtualColumnIndices.length; i++)
+ {
+ tmp.set(virtualColumnIndices[i], scanner.getIndex());
+ }
+ }
+ row.reference(tmp);
+ }
+
+ @Override
+ public boolean seekTo(BytesWritable key) throws IOException {
+ boolean rv = false;
+ for (CachedTableScanner scanner : scanners) {
+ rv = rv || scanner.seekTo(key);
+ }
+ synced = false;
+ if (!atEnd())
+ scanner = queue.poll();
+ return rv;
+ }
+
+ @Override
+ public void seekToEnd() throws IOException {
+ for (CachedTableScanner scanner : scanners) {
+ scanner.seekToEnd();
+ }
+ scanner = null;
+ synced = false;
+ }
+
+ @Override
+ public void close() throws IOException {
+ for (CachedTableScanner scanner : scanners) {
+ scanner.close();
+ }
+ queue.clear();
+ }
+}
Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/ZebraOutputPartition.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/ZebraOutputPartition.java?rev=909667&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/ZebraOutputPartition.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/ZebraOutputPartition.java Sat Feb 13 00:06:15 2010
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.mapreduce;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.pig.data.Tuple;
+
+
+public abstract class ZebraOutputPartition implements Configurable{
+
+ protected Configuration conf;
+
+ @Override
+ public void setConf( Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public Configuration getConf( ) {
+ return conf;
+ }
+
+ public abstract int getOutputPartition(BytesWritable key, Tuple Value)
+ throws IOException;
+}
Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/ZebraProjection.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/ZebraProjection.java?rev=909667&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/ZebraProjection.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/ZebraProjection.java Sat Feb 13 00:06:15 2010
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.mapreduce;
+
+/**
+ * A wrapper class for Projection.
+ */
+public class ZebraProjection {
+ private String projectionStr = null;
+
+ private ZebraProjection(String projStr) {
+ projectionStr = projStr;
+ }
+
+ public static ZebraProjection createZebraProjection(String projStr) {
+ return new ZebraProjection(projStr);
+ }
+
+ public String toString() {
+ return projectionStr;
+ }
+}
\ No newline at end of file
Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/ZebraSchema.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/ZebraSchema.java?rev=909667&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/ZebraSchema.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/ZebraSchema.java Sat Feb 13 00:06:15 2010
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.mapreduce;
+
+import org.apache.hadoop.zebra.schema.Schema;
+
+/**
+ * A wrapper class for Schema.
+ */
+public class ZebraSchema {
+ private String schemaStr = null;
+
+ private ZebraSchema(String str) {
+ String normalizedStr = Schema.normalize(str);
+ schemaStr = normalizedStr;
+ }
+
+ public static ZebraSchema createZebraSchema(String str) {
+ return new ZebraSchema(str);
+ }
+
+ public String toString() {
+ return schemaStr;
+ }
+}
Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/ZebraSortInfo.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/ZebraSortInfo.java?rev=909667&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/ZebraSortInfo.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/ZebraSortInfo.java Sat Feb 13 00:06:15 2010
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.mapreduce;
+
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.zebra.tfile.TFile;
+
+/**
+ * A wrapper class for SortInfo.
+ */
+public class ZebraSortInfo {
+ private String sortColumnsStr = null;
+ private String comparatorStr = null;
+
+ private ZebraSortInfo(String sortColumns, Class<? extends RawComparator<Object>> comparatorClass) {
+ if (comparatorClass != null)
+ comparatorStr = TFile.COMPARATOR_JCLASS+comparatorClass.getName();
+ sortColumnsStr = sortColumns;
+ }
+
+ public static ZebraSortInfo createZebraSortInfo(String sortColumns, Class<? extends RawComparator<Object>> comparatorClass) {
+ return new ZebraSortInfo(sortColumns, comparatorClass);
+ }
+
+ public String getSortColumns() {
+ return sortColumnsStr;
+ }
+
+ public String getComparator() {
+ return comparatorStr;
+ }
+}
Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/ZebraStorageHint.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/ZebraStorageHint.java?rev=909667&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/ZebraStorageHint.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/ZebraStorageHint.java Sat Feb 13 00:06:15 2010
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.mapreduce;
+
+/**
+ * A wrapper class for StorageHint.
+ */
+public class ZebraStorageHint {
+ private String storageHintStr = null;
+
+ private ZebraStorageHint(String shStr) {
+ storageHintStr = shStr;
+ }
+
+ public static ZebraStorageHint createZebraStorageHint(String shStr) {
+ return new ZebraStorageHint(shStr);
+ }
+
+ public String toString() {
+ return storageHintStr;
+ }
+}
Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/package-info.java?rev=909667&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/package-info.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/package-info.java Sat Feb 13 00:06:15 2010
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/**
+ * Providing {@link org.apache.hadoop.mapreduce.InputFormat} and
+ * {@link org.apache.hadoop.mapreduce.OutputFormat} adaptor classes for Hadoop
+ * Zebra Table.
+ *
+ * This package is based on org.apache.hadoop.mapred with adaptation of Hadoop 20 API. The
+ * original package is now depreciated.
+ */
+package org.apache.hadoop.zebra.mapreduce;
+
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/package-info.java?rev=909667&r1=909666&r2=909667&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/package-info.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/package-info.java Sat Feb 13 00:06:15 2010
@@ -39,16 +39,16 @@
* index-based access).
* <p>
* Typically, applications use
- * {@link org.apache.hadoop.zebra.mapred.BasicTableOutputFormat} (which implements
+ * {@link org.apache.hadoop.zebra.mapreduce.BasicTableOutputFormat} (which implements
* the Hadoop {@link org.apache.hadoop.mapred.OutputFormat} interface) to create
* BasicTables through MapReduce. And they use
- * {@link org.apache.hadoop.zebra.mapred.TableInputFormat} (which implements the
+ * {@link org.apache.hadoop.zebra.mapreduce.TableInputFormat} (which implements the
* Hadoop {@link org.apache.hadoop.mapred.InputFormat} to feed the data as their
* MapReduce input.
* <p>
* The API is structured in three packages:
* <UL>
- * <LI> {@link org.apache.hadoop.zebra.mapred} : The MapReduce layer. It contains
+ * <LI> {@link org.apache.hadoop.zebra.mapreduce} : The MapReduce layer. It contains
* two classes: BasicTableOutputFormat for creating BasicTable; and
* TableInputFormat for readding table.
*
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/SchemaConverter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/SchemaConverter.java?rev=909667&r1=909666&r2=909667&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/SchemaConverter.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/SchemaConverter.java Sat Feb 13 00:06:15 2010
@@ -18,98 +18,163 @@
package org.apache.hadoop.zebra.pig;
-import java.util.Iterator;
-
import org.apache.hadoop.zebra.parser.ParseException;
import org.apache.hadoop.zebra.schema.ColumnType;
+import org.apache.hadoop.zebra.schema.Schema.ColumnSchema;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.data.DataType;
+import org.apache.pig.ResourceSchema;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
class SchemaConverter {
- public static ColumnType toTableType(byte ptype)
- {
- ColumnType ret;
- switch (ptype) {
- case DataType.INTEGER:
- ret = ColumnType.INT;
- break;
- case DataType.LONG:
- ret = ColumnType.LONG;
- break;
- case DataType.FLOAT:
- ret = ColumnType.FLOAT;
- break;
- case DataType.DOUBLE:
- ret = ColumnType.DOUBLE;
- break;
- case DataType.BOOLEAN:
- ret = ColumnType.BOOL;
- break;
- case DataType.BAG:
- ret = ColumnType.COLLECTION;
- break;
- case DataType.MAP:
- ret = ColumnType.MAP;
- break;
- case DataType.TUPLE:
- ret = ColumnType.RECORD;
- break;
- case DataType.CHARARRAY:
- ret = ColumnType.STRING;
- break;
- case DataType.BYTEARRAY:
- ret = ColumnType.BYTES;
- break;
- default:
- ret = null;
+ public static ColumnType toTableType(byte ptype) {
+ ColumnType ret;
+ switch (ptype) {
+ case DataType.INTEGER:
+ ret = ColumnType.INT;
+ break;
+ case DataType.LONG:
+ ret = ColumnType.LONG;
+ break;
+ case DataType.FLOAT:
+ ret = ColumnType.FLOAT;
+ break;
+ case DataType.DOUBLE:
+ ret = ColumnType.DOUBLE;
+ break;
+ case DataType.BOOLEAN:
+ ret = ColumnType.BOOL;
+ break;
+ case DataType.BAG:
+ ret = ColumnType.COLLECTION;
+ break;
+ case DataType.MAP:
+ ret = ColumnType.MAP;
+ break;
+ case DataType.TUPLE:
+ ret = ColumnType.RECORD;
+ break;
+ case DataType.CHARARRAY:
+ ret = ColumnType.STRING;
+ break;
+ case DataType.BYTEARRAY:
+ ret = ColumnType.BYTES;
+ break;
+ default:
+ ret = null;
break;
+ }
+ return ret;
+ }
+
+ public static Schema toPigSchema(
+ org.apache.hadoop.zebra.schema.Schema tschema)
+ throws FrontendException {
+ Schema ret = new Schema();
+ for (String col : tschema.getColumns()) {
+ org.apache.hadoop.zebra.schema.Schema.ColumnSchema columnSchema =
+ tschema.getColumn(col);
+ if (columnSchema != null) {
+ ColumnType ct = columnSchema.getType();
+ if (ct == org.apache.hadoop.zebra.schema.ColumnType.RECORD ||
+ ct == org.apache.hadoop.zebra.schema.ColumnType.COLLECTION)
+ ret.add(new FieldSchema(col, toPigSchema(columnSchema.getSchema()), ct.pigDataType()));
+ else
+ ret.add(new FieldSchema(col, ct.pigDataType()));
+ } else {
+ ret.add(new FieldSchema(null, null));
+ }
+ }
+ return ret;
}
- return ret;
- }
-
- public static Schema toPigSchema(
- org.apache.hadoop.zebra.schema.Schema tschema)
- throws FrontendException {
- Schema ret = new Schema();
- for (String col : tschema.getColumns()) {
- org.apache.hadoop.zebra.schema.Schema.ColumnSchema columnSchema =
- tschema.getColumn(col);
- if (columnSchema != null) {
- ColumnType ct = columnSchema.getType();
- if (ct == org.apache.hadoop.zebra.schema.ColumnType.RECORD ||
- ct == org.apache.hadoop.zebra.schema.ColumnType.COLLECTION)
- ret.add(new FieldSchema(col, toPigSchema(columnSchema.getSchema()), ct.pigDataType()));
- else
- ret.add(new FieldSchema(col, ct.pigDataType()));
- } else {
- ret.add(new FieldSchema(null, null));
- }
- }
- return ret;
- }
-
- public static org.apache.hadoop.zebra.schema.Schema fromPigSchema(
- Schema pschema) throws FrontendException, ParseException {
- org.apache.hadoop.zebra.schema.Schema tschema = new org.apache.hadoop.zebra.schema.Schema();
- Schema.FieldSchema columnSchema;
- for (int i = 0; i < pschema.size(); i++) {
- columnSchema = pschema.getField(i);
- if (columnSchema != null) {
- if (DataType.isSchemaType(columnSchema.type))
- tschema.add(new org.apache.hadoop.zebra.schema.Schema.ColumnSchema(columnSchema.alias,
- fromPigSchema(columnSchema.schema), toTableType(columnSchema.type)));
- else if (columnSchema.type == DataType.MAP)
- tschema.add(new org.apache.hadoop.zebra.schema.Schema.ColumnSchema(columnSchema.alias,
- new org.apache.hadoop.zebra.schema.Schema(new org.apache.hadoop.zebra.schema.Schema.ColumnSchema(null,
- org.apache.hadoop.zebra.schema.ColumnType.BYTES)), toTableType(columnSchema.type)));
- else
- tschema.add(new org.apache.hadoop.zebra.schema.Schema.ColumnSchema(columnSchema.alias, toTableType(columnSchema.type)));
- } else {
- tschema.add(new org.apache.hadoop.zebra.schema.Schema.ColumnSchema(null, ColumnType.ANY));
- }
+
+ public static org.apache.hadoop.zebra.schema.Schema fromPigSchema(
+ Schema pschema) throws FrontendException, ParseException {
+ org.apache.hadoop.zebra.schema.Schema tschema = new org.apache.hadoop.zebra.schema.Schema();
+ Schema.FieldSchema columnSchema;
+ for (int i = 0; i < pschema.size(); i++) {
+ columnSchema = pschema.getField(i);
+ if (columnSchema != null) {
+ if (DataType.isSchemaType(columnSchema.type))
+ tschema.add(new org.apache.hadoop.zebra.schema.Schema.ColumnSchema(columnSchema.alias,
+ fromPigSchema(columnSchema.schema), toTableType(columnSchema.type)));
+ else if (columnSchema.type == DataType.MAP)
+ tschema.add(new org.apache.hadoop.zebra.schema.Schema.ColumnSchema(columnSchema.alias,
+ new org.apache.hadoop.zebra.schema.Schema(new org.apache.hadoop.zebra.schema.Schema.ColumnSchema(null,
+ org.apache.hadoop.zebra.schema.ColumnType.BYTES)), toTableType(columnSchema.type)));
+ else
+ tschema.add(new org.apache.hadoop.zebra.schema.Schema.ColumnSchema(columnSchema.alias, toTableType(columnSchema.type)));
+ } else {
+ tschema.add(new org.apache.hadoop.zebra.schema.Schema.ColumnSchema(null, ColumnType.ANY));
+ }
+ }
+ return tschema;
}
- return tschema;
- }
+
+ public static org.apache.hadoop.zebra.schema.Schema convertFromResourceSchema(ResourceSchema rSchema)
+ throws ParseException {
+ if( rSchema == null )
+ return null;
+
+ org.apache.hadoop.zebra.schema.Schema schema = new org.apache.hadoop.zebra.schema.Schema();
+ ResourceSchema.ResourceFieldSchema[] fields = rSchema.getFields();
+ for( ResourceSchema.ResourceFieldSchema field : fields ) {
+ String name = field.getName();
+ ColumnType type = toTableType( field.getType() );
+ org.apache.hadoop.zebra.schema.Schema cSchema = convertFromResourceSchema( field.getSchema() );
+ if( type == ColumnType.MAP && cSchema == null ) {
+ cSchema = new org.apache.hadoop.zebra.schema.Schema();
+ cSchema.add( new org.apache.hadoop.zebra.schema.Schema.ColumnSchema( "", ColumnType.BYTES ) );
+ }
+ org.apache.hadoop.zebra.schema.Schema.ColumnSchema columnSchema =
+ new org.apache.hadoop.zebra.schema.Schema.ColumnSchema( name, cSchema, type );
+ schema.add( columnSchema );
+ }
+
+ return schema;
+ }
+
+ public static ResourceSchema convertToResourceSchema(org.apache.hadoop.zebra.schema.Schema tSchema) {
+ if( tSchema == null )
+ return null;
+
+ ResourceSchema rSchema = new ResourceSchema();
+ int fieldCount = tSchema.getNumColumns();
+ ResourceFieldSchema[] rFields = new ResourceFieldSchema[fieldCount];
+ for( int i = 0; i < fieldCount; i++ ) {
+ org.apache.hadoop.zebra.schema.Schema.ColumnSchema cSchema = tSchema.getColumn( i );
+ if( cSchema != null )
+ rFields[i] = convertToResourceFieldSchema( cSchema );
+ else
+ rFields[i] = new ResourceFieldSchema();
+ }
+ rSchema.setFields( rFields );
+ return rSchema;
+ }
+
+ public static ResourceFieldSchema convertToResourceFieldSchema(
+ ColumnSchema cSchema) {
+ ResourceFieldSchema field = new ResourceFieldSchema();
+
+ if( cSchema.getType() == ColumnType.COLLECTION && cSchema.getSchema().getNumColumns() > 1 ) {
+ field.setType( ColumnType.RECORD.pigDataType() );
+ field.setSchema( convertToResourceSchema( cSchema.getSchema() ) );
+ } else if( cSchema.getType() ==ColumnType.ANY && cSchema.getName().isEmpty() ) { // For anonymous column
+ field.setName( null );
+ field.setType( DataType.UNKNOWN );
+ field.setSchema( null );
+ } else {
+ field.setName( cSchema.getName() );
+ field.setType( cSchema.getType().pigDataType() );
+ if( cSchema.getType() == ColumnType.MAP )
+ field.setSchema( null );
+ else
+ field.setSchema( convertToResourceSchema( cSchema.getSchema() ) );
+ }
+
+ return field;
+ }
+
}