You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/01/22 22:17:22 UTC

svn commit: r614325 [2/6] - in /incubator/pig/branches/types: ./ lib/ scripts/ src/org/apache/pig/ src/org/apache/pig/builtin/ src/org/apache/pig/data/ src/org/apache/pig/impl/ src/org/apache/pig/impl/builtin/ src/org/apache/pig/impl/eval/ src/org/apac...

Modified: incubator/pig/branches/types/src/org/apache/pig/data/DataBag.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DataBag.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/DataBag.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/DataBag.java Tue Jan 22 13:17:12 2008
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *	 http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,319 +17,102 @@
  */
 package org.apache.pig.data;
 
+import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Collection;
 import java.util.Iterator;
-import java.util.List;
-
-import org.apache.pig.impl.eval.EvalSpec;
-import org.apache.pig.impl.eval.collector.DataCollector;
-
-
-/**
- * A collection of Data values of a given type.  For performance reasons
- * types are not checked on add or read.
- */
-public class DataBag extends ComplexDatum {
-	
-/**
- * Create an empty bag of the indicated type.
- * @param elementType what this will be a bag of.
- */
-public DataBag(Datum.DataType elementType)
-{
-	mContent = new ArrayList<Datum>();
-	mElementType = elementType;
-}
-
-/**
- * Create a bag based on a list.  The element type will be taken from the
- * first Datum in the list.
- * @param c list of data to place in the bag.  This list cannot be empty.
- * @throws IOException if the list is empty.
- */
-public DataBag(List<Datum> c) throws IOException
-{
-	mContent = c;
-	if (c.size() == 0) {
-		throw new IOException("Attempt to instantiate empty bag with no type.");
-	}
-	mElementType = c.get(0).getType();
-}
-
-/**
- * Create a bag given a single datum.  The element type will be taken from
- * the datum.
- * @param t The datum to use.  The datum must not be java null (pig null
- * is ok).
- * @throws IOException if the datum is null.
- */
-public DataBag(Datum t) throws IOException
-{
-	if (t == null) {
-		throw new IOException("Attempt to instantiate empty bag with no type.");
-	}
-	mContent = new ArrayList<Datum>();
-	mContent.add(t);
-	mElementType = t.getType();
-}
-
-/**
- * @return BAG
- */
-public DataType getType() { return Datum.DataType.BAG; }
-
-/**
- * Find out what this is a bag of.
- * @return datatype
- */
-public DataType bagOf() { return mElementType; }
-
-public long size() { return mContent.size(); }
-
-/**
- * @deprecated Use size() instead.
- */
-public int cardinality() { return (int)size(); }
-
-/**
- * Checks if the size of the bag is empty.
- */
-public boolean isEmpty() { return mContent.size() == 0; }
-	
-public int compareTo(Object other)
-{
-	if (!(other instanceof Datum)) return -1;
-
-	Datum od = (Datum)other;
-
-	if (od.getType() != Datum.DataType.BAG) return crossTypeCompare(od);
-
-	DataBag bag = (DataBag)od;
-
-	Datum.DataType dt = bagOf();
-	Datum.DataType dto = bag.bagOf();
-	if (dt != dto) return dt.compareTo(dto);
-
-	long sz = size();
-	long tsz = bag.size();
-	if (sz < tsz) return -1;
-	else if (sz > tsz) return 1;
-
-	Iterator<Datum> i = content();
-	Iterator<Datum> j = bag.content();
-	while (i.hasNext()) {
-		Datum us = i.next();
-		Datum them = j.next();
-		int rc = us.compareTo(them);
-		if (rc != 0) return rc;
-	}
-
-	return 0;
-}
-	
-// Don't make this use compareTo.  These functions are used in things like hashs
-// and we want them to be as fast as possible.
-@Override
-public boolean equals(Object other)
-{
-	if (!(other instanceof DataBag)) return false;
-
-	DataBag bag = (DataBag)other;
-
-	long sz = size();
-
-	if (bagOf() != bag.bagOf()) return false;
-	if (bag.size() != sz) return false;
-
-	Iterator<Datum> i = content();
-	Iterator<Datum> j = bag.content();
-	while (i.hasNext()) {
-		Datum us = i.next();
-		Datum them = j.next();
-		if (!us.equals(them)) return false;
-	}
-
-	return true;
-}
-	
-public void sort()
-{
-	Collections.sort(mContent);
-	mIsSorted = true;
-}
-	
-public void sort(EvalSpec spec)
-{
-	Collections.sort(mContent, spec.getComparator());
-	mIsSorted = true;
-}
-	
-public void arrange(EvalSpec spec)
-{
-	sort(spec);
-	mIsSorted = true;
-}
-	
-public void distinct()
-{
-	// ARG!!!! We're sorting the whole thing and then doing a distinct.  Need to
-	// change this to do distinct during sort.
-	Collections.sort(mContent);
-	mIsSorted = true;
-		
-	Tuple lastTup = null;
-	for (Iterator<Datum> it = mContent.iterator(); it.hasNext(); ) {
-		Tuple thisTup = (Tuple)it.next();
-			
-		if (lastTup == null) {
-			lastTup = thisTup;
-			continue;
-		}
-			
-		if (thisTup.compareTo(lastTup) == 0) {
-			it.remove();
-		} else {
-			lastTup = thisTup;
-		}
-	}
-}
-
-/**
- * Get an iterator to the contents of the bag.  The iterator is an
- * iterator of Datum.  If something else is expected the caller will have to
- * cast it.
- */
-public Iterator<Datum> content() { return mContent.iterator(); }
-
-/**
- * Add a datum to the bag.  The datatype of the datum should match the
- * result of bagOf(), but that will not be checked in the interest of
- * speed.  Would like this method to be final, but BigDataBag overrides it.
- */
-public void add(Datum e)
-{
-	if (e != null) mContent.add(e);
-}
-
-/**
- * Add the contents of a bag to the bag.  The datatype of the data should match the
- * result of bagOf(), but that will not be checked in the interest of
- * speed.
- */
-public final void addAll(DataBag b)
-{
-	Iterator<Datum> it = b.content();
-	while (it.hasNext()) {
-		add(it.next());
-	}
-}
-
-/**
- * Remove a particular datum from the bag.  This operation will be slow
- * and should not be used much.
- */
-public void remove(Datum d) { mContent.remove(d); }
-
-/**
- * Returns the value of field i. Since there may be more than one tuple in the bag, this
- * function throws an exception if it is not the case that all tuples agree on this field
- */
-/*
-public DataAtom getField(int i) throws IOException
-{
-	DataAtom val = null;
-
-	for (Iterator<Tuple> it = mContent(); it.hasNext();) {
-		DataAtom currentVal = it.next().getAtomField(i);
-
-		if (val == null) {
-			val = currentVal;
-		} else {
-			if (!val.strval().equals(currentVal.strval()))
-				throw new IOException("Cannot call getField on a databag unless all tuples agree.");
-		}
-	}
+import java.util.ArrayList;
 
-	if (val == null)
-		throw new IOException("Cannot call getField on an empty databag.");
+import org.apache.hadoop.io.WritableComparable;
 
-	return val;
-}
-*/
+import org.apache.pig.impl.util.Spillable;
+import org.apache.pig.impl.mapreduceExec.PigMapReduce;
 
 /**
- * Empty the bag of its contents.  It retains they type of bag it is.
+ * A collection of Tuples.  A DataBag may or may not fit into memory.
+ * DataBag extends spillable, which means that it registers with a memory
+ * manager.  By default, it attempts to keep all of its contents in memory.
+ * If it is asked by the memory manager to spill to disk (by a call to
+ * spill()), it takes whatever it has in memory, opens a spill file, and
+ * writes the contents out.  This may happen multiple times.  The bag
+ * tracks all of the files it's spilled to.
+ * 
+ * DataBag provides an Iterator interface, that allows callers to read
+ * through the contents.  The iterators are aware of the data spilling.
+ * They have to be able to handle reading from files, as well as the fact
+ * that data they were reading from memory may have been spilled to disk
+ * underneath them.
+ *
+ * The DataBag interface assumes that all data is written before any is
+ * read.  That is, a DataBag cannot be used as a queue.  If data is written
+ * after data is read, the results are undefined.  This condition is not
+ * checked on each add or read, for reasons of speed.  Caveat emptor.
+ *
+ * Since spills are asynchronous (the memory manager requesting a spill
+ * runs in a separate thread), all operations dealing with the mContents
+ * Collection (which is the collection of tuples contained in the bag) have
+ * to be synchronized.  This means that reading from a DataBag is currently
+ * serialized.  This is ok for the moment because pig execution is
+ * currently single threaded.  A ReadWriteLock was experimented with, but
+ * it was found to be about 10x slower than using the synchronize keyword.
+ * If pig changes its execution model to be multithreaded, we may need to
+ * return to this issue, as synchronizing reads will most likely defeat the
+ * purpose of multi-threading execution.
+ *
+ * DataBag come in several types, default, sorted, and distinct.  The type
+ * must be chosen up front, there is no way to convert a bag on the fly.
  */
-public void clear()
-{
-	mContent.clear();
-	mIsSorted = false;
-}
-	
-@Override
-public void write(DataOutput out) throws IOException
-{
-	 out.write(Datum.DataType.BAG.getMarker());
-	 // Now write out the element type, so the reader knows what kind of bag to
-	 // instantiate.
-	 out.write(mElementType.getMarker());
-	 out.writeLong(size());
-	 Iterator<Datum> it = content();
-	 while (it.hasNext()) {
-		 Datum item = it.next();
-		 item.write(out);
-	 }	
-}
-	
-public static abstract class BagDelimiterTuple extends Tuple{}
-public static class StartBag extends BagDelimiterTuple{}
-	
-public static class EndBag extends BagDelimiterTuple{}
-	
-public static final Tuple startBag = new StartBag();
-public static final Tuple endBag = new EndBag();
-	
-static DataBag read(DataInput in) throws IOException
-{
-	DataType etype = Datum.DataType.markerToType(in.readByte());
-
-	long size = in.readLong();
-	DataBag ret = new DataBag(etype);
-	// TODO
-	//DataBag ret = BagFactory.getInstance().getNewBag();
-		
-	for (int i = 0; i < size; i++) {
-		ret.add(DatumImpl.readDatum(in));
-	}
-	return ret;
-}
-	
-public void markStale(boolean stale){}
-	
-@Override
-public String toString() {
-	StringBuffer sb = new StringBuffer();
-	sb.append('{');
-	Iterator<Datum> it = content();
-	while ( it.hasNext() ) {
-		Datum t = it.next();
-		String s = t.toString();
-		sb.append(s);
-		if (it.hasNext())
-			sb.append(", ");
-	}
-	sb.append('}');
-	return sb.toString();
-}
-	
-public boolean mIsSorted(){ return mIsSorted; }
-	
-protected boolean mIsSorted = false;
-protected List<Datum> mContent;
-protected Datum.DataType mElementType;
-
+public interface DataBag extends Spillable, WritableComparable, Iterable<Tuple> {
+    /**
+     * Get the number of elements in the bag, both in memory and on disk.
+     */
+    long size();
+
+    /**
+     * Find out if the bag is sorted.
+     */
+    boolean isSorted();
+    
+    /**
+     * Find out if the bag is distinct.
+     */
+    boolean isDistinct();
+    
+    /**
+     * Get an iterator to the bag. For default and distinct bags,
+     * no particular order is guaranteed. For sorted bags the order
+     * is guaranteed to be sorted according
+     * to the provided comparator.
+     */
+    Iterator<Tuple> iterator();
+
+    /**
+     * Add a tuple to the bag.
+     * @param t tuple to add.
+     */
+    void add(Tuple t);
+
+    /**
+     * Add contents of a bag to the bag.
+     * @param b bag to add contents of.
+     */
+    void addAll(DataBag b);
+
+    /**
+     * Clear out the contents of the bag, both on disk and in memory.
+     * Any attempts to read after this is called will produce undefined
+     * results.
+     */
+    void clear();
+
+    /**
+     * This is used by FuncEvalSpec.FakeDataBag.
+     * @param stale Set stale state.
+     */
+    void markStale(boolean stale);
 }

Added: incubator/pig/branches/types/src/org/apache/pig/data/DataType.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DataType.java?rev=614325&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/DataType.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/data/DataType.java Tue Jan 22 13:17:12 2008
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.lang.Class;
+import java.lang.reflect.Type;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * A class of static final values used to encode data type.  This could be
+ * done as an enumeration, but it done as byte codes instead to save
+ * creating objects.  A few utility functions are also included.
+ */
+public class DataType {
+    // IMPORTANT! This list can be used to record values of data on disk,
+    // so do not change the values.  You may strand user data.
+    // IMPORTANT! Order matters here, as compare() below uses the order to
+    // order unlink datatypes.  Don't change this ordering.
+    // Spaced unevenly to leave room for new entries without changing
+    // values or creating order issues.  
+    public static final byte UNKNOWN   =   0;
+    public static final byte NULL      =   1;
+    public static final byte BOOLEAN   =   5;
+    public static final byte INTEGER   =  10;
+    public static final byte LONG      =  15;
+    public static final byte FLOAT     =  20;
+    public static final byte DOUBLE    =  25;
+    public static final byte BYTEARRAY =  50;
+    public static final byte CHARARRAY =  55;
+    public static final byte MAP       = 100;
+    public static final byte TUPLE     = 110;
+    public static final byte BAG       = 120;
+    public static final byte ERROR     =  -1;
+
+    /**
+     * Determine the datatype of an object.
+     * @param o Object to test.
+     * @return byte code of the type, or ERROR if we don't know.
+     */
+    public static byte findType(Object o) {
+        if (o == null) return NULL;
+
+        // Try to put the most common first
+        if (o instanceof DataByteArray) return BYTEARRAY;
+        else if (o instanceof String) return CHARARRAY;
+        else if (o instanceof Tuple) return TUPLE;
+        else if (o instanceof DataBag) return BAG;
+        else if (o instanceof Integer) return INTEGER;
+        else if (o instanceof Long) return LONG;
+        else if (o instanceof Map) return MAP;
+        else if (o instanceof Float) return FLOAT;
+        else if (o instanceof Double) return DOUBLE;
+        else if (o instanceof Boolean) return BOOLEAN;
+        else return ERROR;
+    }
+
+    /**
+     * Given a Type object determine the data type it represents.  This isn't
+     * cheap, as it uses reflection, so use sparingly.
+     * @param t Type to examine
+     * @return byte code of the type, or ERROR if we don't know.
+     */
+    public static byte findType(Type t) {
+        if (t == null) return NULL;
+
+        // Try to put the most common first
+        if (t == DataByteArray.class) return BYTEARRAY;
+        else if (t == String.class) return CHARARRAY;
+        else if (t == Integer.class) return INTEGER;
+        else if (t == Long.class) return LONG;
+        else if (t == Float.class) return FLOAT;
+        else if (t == Double.class) return DOUBLE;
+        else if (t == Boolean.class) return BOOLEAN;
+        else {
+            // Might be a tuple or a bag, need to check the interfaces it
+            // implements
+            if (t instanceof Class) {
+                Class c = (Class)t;
+                Class[] interfaces = c.getInterfaces();
+                for (int i = 0; i < interfaces.length; i++) {
+                    if (interfaces[i].getName().equals("org.apache.pig.data.Tuple")) {
+                        return TUPLE;
+                    } else if (interfaces[i].getName().equals("org.apache.pig.data.DataBag")) {
+                        return BAG;
+                    } else if (interfaces[i].getName().equals("java.util.Map")) {
+                        return MAP;
+                    }
+                }
+            }
+            return ERROR;
+        }
+    }
+
+    /**
+     * Get the type name.
+     * @param o Object to test.
+     * @return type name, as a String.
+     */
+    public static String findTypeName(Object o) {
+        byte dt = findType(o);
+        switch (dt) {
+        case NULL:      return "NULL";
+        case BOOLEAN:   return "boolean";
+        case INTEGER:   return "integer";
+        case LONG:      return "long";
+        case FLOAT:     return "float";
+        case DOUBLE:    return "double";
+        case BYTEARRAY: return "bytearray";
+        case CHARARRAY: return "chararray";
+        case MAP:       return "map";
+        case TUPLE:     return "tuple";
+        case BAG:       return "bag";
+        default: return "Unknown";
+        }
+    }
+
+    /**
+     * Determine whether the this data type is complex.
+     * @param dataType Data type code to test.
+     * @return true if dataType is bag, tuple, or map.
+     */
+    public static boolean isComplex(byte dataType) {
+        return ((dataType == BAG) || (dataType == TUPLE) ||
+            (dataType == MAP));
+    }
+
+    /**
+     * Determine whether the object is complex or atomic.
+     * @param o Object to determine type of.
+     * @return true if dataType is bag, tuple, or map.
+     */
+    public static boolean isComplex(Object o) {
+        return isComplex(findType(o));
+    }
+
+    /**
+     * Determine whether the this data type is atomic.
+     * @param dataType Data type code to test.
+     * @return true if dataType is bytearray, chararray, integer, long,
+     * float, or boolean.
+     */
+    public static boolean isAtomic(byte dataType) {
+        return ((dataType == BYTEARRAY) || (dataType == CHARARRAY) ||
+            (dataType == INTEGER) || (dataType == LONG) || 
+            (dataType == FLOAT) || (dataType == BOOLEAN));
+    }
+
+    /**
+     * Determine whether the this data type is atomic.
+     * @param o Object to determine type of.
+     * @return true if dataType is bytearray, chararray, integer, long,
+     * float, or boolean.
+     */
+    public static boolean isAtomic(Object o) {
+        return isAtomic(findType(o));
+    }
+
+    /**
+     * Compare two objects to each other.  This function is necessary
+     * because there's no super class that implements compareTo.  This
+     * function provides an (arbitrary) ordering of objects of different
+     * types as follows:  NULL &lt; BOOLEAN &lt; INTEGER &lt; LONG &lt;
+     * FLOAT &lt; DOUBLE * &lt; BYTEARRAY &lt; STRING &lt; MAP &lt;
+     * TUPLE &lt; BAG.  No other functions should implement this cross
+     * object logic.  They should call this function for it instead.
+     * @param o1 First object
+     * @param o2 Second object
+     * @return -1 if o1 is less, 0 if they are equal, 1 if o2 is less.
+     */
+    public static int compare(Object o1, Object o2) {
+        byte dt1 = findType(o1);
+        byte dt2 = findType(o2);
+
+        if (dt1 == dt2) {
+            switch (dt1) {
+            case NULL:
+                return 0;
+
+            case BOOLEAN:
+                return ((Boolean)o1).compareTo((Boolean)o2);
+
+            case INTEGER:
+                return ((Integer)o1).compareTo((Integer)o2);
+
+            case LONG:
+                return ((Long)o1).compareTo((Long)o2);
+
+            case FLOAT:
+                return ((Float)o1).compareTo((Float)o2);
+
+            case DOUBLE:
+                return ((Double)o1).compareTo((Double)o2);
+
+            case BYTEARRAY:
+                return ((DataByteArray)o1).compareTo((DataByteArray)o2);
+
+            case CHARARRAY:
+                return ((String)o1).compareTo((String)o2);
+
+            case MAP: {
+                Map<Object, Object> m1 = (Map<Object, Object>)o1;
+                Map<Object, Object> m2 = (Map<Object, Object>)o2;
+                int sz1 = m1.size();
+                int sz2 = m2.size();
+                if (sz1 < sz2) {
+                    return -1;
+                } else if (sz1 > sz2) { 
+                    return 1;
+                } else {
+                    Iterator<Map.Entry<Object, Object> > i1 =
+                        m1.entrySet().iterator();
+                    Iterator<Map.Entry<Object, Object> > i2 =
+                        m2.entrySet().iterator();
+                    while (i1.hasNext()) {
+                        // This isn't real meaningful, as there are no
+                        // guarantees on iteration order in a map.  But it
+                        // makes more sense than iterating through one and
+                        // probing the other, which will almost always
+                        // result in missing keys in the second and thus
+                        // not provide communativity.
+                        Map.Entry<Object, Object> entry1 = i1.next();
+                        Map.Entry<Object, Object> entry2 = i2.next();
+                        int c = compare(entry1.getKey(), entry2.getKey());
+                        if (c != 0) {
+                            return c;
+                        } else {
+                            c = compare(entry1.getValue(), entry2.getValue());
+                            if (c != 0) {
+                                return c;
+                            }
+                        } 
+                    }
+                    return 0;
+                }
+                      }
+
+            case TUPLE:
+                return ((Tuple)o1).compareTo((Tuple)o2);
+
+            case BAG:
+                return ((DataBag)o1).compareTo((DataBag)o2);
+
+            default:
+                throw new RuntimeException("Unkown type " + dt1 +
+                    " in compare");
+            }
+        } else if (dt1 < dt2) {
+            return -1;
+        } else {
+            return 1;
+        }
+    }
+
+}

Added: incubator/pig/branches/types/src/org/apache/pig/data/DefaultAbstractBag.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DefaultAbstractBag.java?rev=614325&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/DefaultAbstractBag.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/data/DefaultAbstractBag.java Tue Jan 22 13:17:12 2008
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.ArrayList;
+
+import org.apache.pig.impl.util.Spillable;
+import org.apache.pig.impl.mapreduceExec.PigMapReduce;
+
+/**
+ * A collection of Tuples.  A DataBag may or may not fit into memory.
+ * DataBag extends spillable, which means that it registers with a memory
+ * manager.  By default, it attempts to keep all of its contents in memory.
+ * If it is asked by the memory manager to spill to disk (by a call to
+ * spill()), it takes whatever it has in memory, opens a spill file, and
+ * writes the contents out.  This may happen multiple times.  The bag
+ * tracks all of the files it's spilled to.
+ * 
+ * DataBag provides an Iterator interface, that allows callers to read
+ * through the contents.  The iterators are aware of the data spilling.
+ * They have to be able to handle reading from files, as well as the fact
+ * that data they were reading from memory may have been spilled to disk
+ * underneath them.
+ *
+ * The DataBag interface assumes that all data is written before any is
+ * read.  That is, a DataBag cannot be used as a queue.  If data is written
+ * after data is read, the results are undefined.  This condition is not
+ * checked on each add or read, for reasons of speed.  Caveat emptor.
+ *
+ * Since spills are asynchronous (the memory manager requesting a spill
+ * runs in a separate thread), all operations dealing with the mContents
+ * Collection (which is the collection of tuples contained in the bag) have
+ * to be synchronized.  This means that reading from a DataBag is currently
+ * serialized.  This is ok for the moment because pig execution is
+ * currently single threaded.  A ReadWriteLock was experimented with, but
+ * it was found to be about 10x slower than using the synchronize keyword.
+ * If pig changes its execution model to be multithreaded, we may need to
+ * return to this issue, as synchronizing reads will most likely defeat the
+ * purpose of multi-threading execution.
+ *
+ * DataBag come in several types, default, sorted, and distinct.  The type
+ * must be chosen up front, there is no way to convert a bag on the fly.
+ * 
+ * This is the default implementation.  Users are free to provide their
+ * own implementation, but they should keep in mind the need to support
+ * bags that do not fit in memory, and handle spilling in an efficient
+ * manner.
+ */
+public abstract class DefaultAbstractBag implements DataBag {
+    // Container that holds the tuples. Actual object instantiated by
+    // subclasses.
+    protected Collection<Tuple> mContents;
+
+    // Spill files we've created.  These need to be removed in finalize.
+    protected ArrayList<File> mSpillFiles;
+
+    // Total size, including tuples on disk.  Stored here so we don't have
+    // to run through the disk when people ask.
+    protected long mSize = 0;
+
+    protected boolean mMemSizeChanged = false;
+
+    protected long mMemSize = 0;
+
+    /**
+     * Get the number of elements in the bag, both in memory and on disk.
+     */
+    public long size() {
+        return mSize;
+    }
+
+    /**
+     * Add a tuple to the bag.
+     * @param t tuple to add.
+     */
+    public void add(Tuple t) {
+        synchronized (mContents) {
+            mMemSizeChanged = true;
+            mSize++;
+            mContents.add(t);
+        }
+    }
+
+    /**
+     * Add contents of a bag to the bag.
+     * @param b bag to add contents of.
+     */
+    public void addAll(DataBag b) {
+        synchronized (mContents) {
+            mMemSizeChanged = true;
+            mSize += b.size();
+            Iterator<Tuple> i = b.iterator();
+            while (i.hasNext()) mContents.add(i.next());
+        }
+    }
+
+    /**
+     * Add contents of a container to the bag.
+     * @param c Collection to add contents of.
+     */
+    public void addAll(Collection<Tuple> c) {
+        synchronized (mContents) {
+            mMemSizeChanged = true;
+            mSize += c.size();
+            Iterator<Tuple> i = c.iterator();
+            while (i.hasNext()) mContents.add(i.next());
+        }
+    }
+
+    /**
+     * Return the size of memory usage.
+     */
+    @Override
+    public long getMemorySize() {
+        if (!mMemSizeChanged) return mMemSize;
+
+        long used = 0;
+        // I can't afford to talk through all the tuples every time the
+        // memory manager wants to know if it's time to dump.  Just sample
+        // the first 100 and see what we get.  This may not be 100%
+        // accurate, but it's just an estimate anyway.
+        int j;
+        int numInMem = 0;
+        synchronized (mContents) {
+            numInMem = mContents.size();
+            // Measure only what's in memory, not what's on disk.
+            Iterator<Tuple> i = mContents.iterator();
+            for (j = 0; i.hasNext() && j < 100; j++) { 
+                used += i.next().getMemorySize();
+            }
+        }
+
+        if (numInMem > 100) {
+            // Estimate the per tuple size.  Do it in integer arithmetic
+            // (even though it will be slightly less accurate) for speed.
+            used /= j;
+            used *= numInMem;
+        }
+
+        mMemSize = used;
+        mMemSizeChanged = false;
+        return used;
+    }
+
+    /**
+     * Clear out the contents of the bag, both on disk and in memory.
+     * Any attempts to read after this is called will produce undefined
+     * results.
+     */
+    public void clear() {
+        synchronized (mContents) {
+            mContents.clear();
+            if (mSpillFiles != null) {
+                for (int i = 0; i < mSpillFiles.size(); i++) {
+                    mSpillFiles.get(i).delete();
+                }
+                mSpillFiles.clear();
+            }
+            mSize = 0;
+        }
+    }
+
+    /**
+     * This method is potentially very expensive since it may require a
+     * sort of the bag; don't call it unless you have to.
+     */
+    public int compareTo(Object other) {
+        if (this == other)
+            return 0;
+        if (other instanceof DataBag) {
+            DataBag bOther = (DataBag) other;
+            if (this.size() != bOther.size()) {
+                if (this.size() > bOther.size()) return 1;
+                else return -1;
+            }
+
+            // Ugh, this is bogus.  But I have to know if two bags have the
+            // same tuples, regardless of order.  Hopefully most of the
+            // time the size check above will prevent this.
+            // If either bag isn't already sorted, create a sorted bag out
+            // of it so I can guarantee order.
+            DataBag thisClone;
+            DataBag otherClone;
+            if (this instanceof SortedDataBag ||
+                    this instanceof DistinctDataBag) {
+                thisClone = this;
+            } else {
+                thisClone = new SortedDataBag(null);
+                Iterator<Tuple> i = iterator();
+                while (i.hasNext()) thisClone.add(i.next());
+            }
+            if (other instanceof SortedDataBag ||
+                    this instanceof DistinctDataBag) {
+                otherClone = bOther;
+            } else {
+                otherClone = new SortedDataBag(null);
+                Iterator<Tuple> i = bOther.iterator();
+                while (i.hasNext()) otherClone.add(i.next());
+            }
+            Iterator<Tuple> thisIt = thisClone.iterator();
+            Iterator<Tuple> otherIt = otherClone.iterator();
+            while (thisIt.hasNext() && otherIt.hasNext()) {
+                Tuple thisT = thisIt.next();
+                Tuple otherT = otherIt.next();
+                
+                int c = thisT.compareTo(otherT);
+                if (c != 0) return c;
+            }
+            
+            return 0;   // if we got this far, they must be equal
+        } else {
+            return DataType.compare(this, other);
+        }
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        return compareTo(other) == 0;
+    }
+
+    /**
+     * Write a bag's contents to disk.
+     * @param out DataOutput to write data to.
+     * @throws IOException (passes it on from underlying calls).
+     */
+    @Override
+    public void write(DataOutput out) throws IOException {
+        // We don't care whether this bag was sorted or distinct because
+        // using the iterator to write it will guarantee those things come
+        // correctly.  And on the other end there'll be no reason to waste
+        // time re-sorting or re-applying distinct.
+        out.write(DataType.BAG);
+        out.writeLong(size());
+        Iterator<Tuple> it = iterator();
+        while (it.hasNext()) {
+            Tuple item = it.next();
+            item.write(out);
+        }    
+    }
+ 
+    /**
+     * Read a bag from disk.
+     * @param in DataInput to read data from.
+     * @throws IOException (passes it on from underlying calls).
+     */
+    public void readFields(DataInput in) throws IOException {
+        long size = in.readLong();
+        
+        for (long i = 0; i < size; i++) {
+            Object o = DataReaderWriter.readDatum(in);
+            add((Tuple)o);
+        }
+    }
+
+    /**
+     * This is used by FuncEvalSpec.FakeDataBag.
+     * @param stale Set stale state.
+     */
+    public void markStale(boolean stale)
+    {
+    }
+
+    /**
+     * Write the bag into a string. */
+    @Override
+    public String toString() {
+        StringBuffer sb = new StringBuffer();
+        sb.append('{');
+        Iterator<Tuple> it = iterator();
+        while ( it.hasNext() ) {
+            Tuple t = it.next();
+            String s = t.toString();
+            sb.append(s);
+            if (it.hasNext()) sb.append(", ");
+        }
+        sb.append('}');
+        return sb.toString();
+    }
+
+    @Override
+    public int hashCode() {
+        int hash = 1;
+        Iterator<Tuple> i = iterator();
+        while (i.hasNext()) {
+            // Use 37 because we want a prime, and tuple uses 31.
+            hash = 37 * hash + i.next().hashCode();
+        }
+        return hash;
+    }
+
+    /**
+     * Need to override finalize to clean out the mSpillFiles array.
+     */
+    @Override
+    protected void finalize() {
+        if (mSpillFiles != null) {
+            for (int i = 0; i < mSpillFiles.size(); i++) {
+                mSpillFiles.get(i).delete();
+            }
+        }
+    }
+
+    /**
+     * Get a file to spill contents to.  The file will be registered in the
+     * mSpillFiles array.
+     * @return stream to write tuples to.
+     */
+    protected DataOutputStream getSpillFile() throws IOException {
+        if (mSpillFiles == null) {
+            // We want to keep the list as small as possible.
+            mSpillFiles = new ArrayList<File>(1);
+        }
+
+        File f = File.createTempFile("pigbag", null);
+        f.deleteOnExit();
+        mSpillFiles.add(f);
+        return new DataOutputStream(new BufferedOutputStream(
+            new FileOutputStream(f)));
+    }
+
+    /**
+     * Report progress to HDFS.
+     */
+    protected void reportProgress() {
+        if (PigMapReduce.reporter != null) {
+            PigMapReduce.reporter.progress();
+        }
+    }
+
+    public static abstract class BagDelimiterTuple extends DefaultTuple{}
+    public static class StartBag extends BagDelimiterTuple{}
+    
+    public static class EndBag extends BagDelimiterTuple{}
+    
+    public static final Tuple startBag = new StartBag();
+    public static final Tuple endBag = new EndBag();
+
+    protected static final int MAX_SPILL_FILES = 100;
+ 
+}

Added: incubator/pig/branches/types/src/org/apache/pig/data/DefaultBagFactory.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DefaultBagFactory.java?rev=614325&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/DefaultBagFactory.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/data/DefaultBagFactory.java Tue Jan 22 13:17:12 2008
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import org.apache.pig.impl.eval.EvalSpec;
+import org.apache.pig.impl.util.SpillableMemoryManager;
+
+/**
+ * A bag factory.  Can be used to generate different types of bags
+ * depending on what is needed.
+ */
+public class DefaultBagFactory extends BagFactory {
+    /**
+     * Get a default (unordered, not distinct) data bag.
+     */
+    public DataBag newDefaultBag() {
+        DataBag b = new DefaultDataBag();
+        registerBag(b);
+        return b;
+    }
+
+    /**
+     * Get a sorted data bag.
+     * @param spec EvalSpec that controls how the data is sorted.
+     * If null, default comparator will be used.
+     */
+    public DataBag newSortedBag(EvalSpec spec) {
+        DataBag b = new SortedDataBag(spec);
+        registerBag(b);
+        return b;
+    }
+    
+    /**
+     * Get a distinct data bag.
+     */
+    public DataBag newDistinctBag() {
+        DataBag b = new DistinctDataBag();
+        registerBag(b);
+        return b;
+    }
+
+    DefaultBagFactory() {
+        super();
+    }
+
+}
+

Copied: incubator/pig/branches/types/src/org/apache/pig/data/DefaultDataBag.java (from r610055, incubator/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java)
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DefaultDataBag.java?p2=incubator/pig/branches/types/src/org/apache/pig/data/DefaultDataBag.java&p1=incubator/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java&r1=610055&r2=614325&rev=614325&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/DefaultDataBag.java Tue Jan 22 13:17:12 2008
@@ -35,7 +35,9 @@
  * are stored in an ArrayList, since there is no concern for order or
  * distinctness.
  */
-public class DefaultDataBag extends DataBag {
+public class DefaultDataBag extends DefaultAbstractBag {
+
+    private static TupleFactory gTupleFactory = TupleFactory.getInstance();
 
     public DefaultDataBag() {
         mContents = new ArrayList<Tuple>();
@@ -162,7 +164,7 @@
                         "Unable to find our spill file", fnfe);
                     throw new RuntimeException(fnfe);
                 }
-                Tuple t = new Tuple();
+                Tuple t = gTupleFactory.newTuple();
                 for (int i = 0; i < mMemoryPtr; i++) {
                     try {
                         t.readFields(mIn);
@@ -195,7 +197,7 @@
         private Tuple readFromFile() {
             if (mIn != null) {
                 // We already have a file open
-                Tuple t = new Tuple();
+                Tuple t = gTupleFactory.newTuple();
                 try {
                     t.readFields(mIn);
                     return t;

Added: incubator/pig/branches/types/src/org/apache/pig/data/DefaultTupleFactory.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DefaultTupleFactory.java?rev=614325&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/DefaultTupleFactory.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/data/DefaultTupleFactory.java Tue Jan 22 13:17:12 2008
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * A bag factory.  Can be used to generate different types of bags
+ * depending on what is needed.
+ */
+public class DefaultTupleFactory extends TupleFactory {
+    public Tuple newTuple() {
+        return new DefaultTuple();
+    
+    }
+
+    public Tuple newTuple(int size) {
+        return new DefaultTuple(size);
+    }
+    
+    public Tuple newTuple(List c) {
+        return new DefaultTuple(c);
+    }
+
+    public Tuple newTuple(Object datum) {
+        Tuple t = new DefaultTuple(1);
+        try {
+            t.set(0, datum);
+        } catch (IOException e) {
+            // The world has come to an end, we just allocated a tuple with one slot
+            // but we can't write to that slot.
+            throw new RuntimeException("Unable to write to field 0 in newly " +
+                "allocated tuple of size 1!", e);
+        }
+        return t;
+    }
+
+    DefaultTupleFactory() {
+    }
+
+}
+

Copied: incubator/pig/branches/types/src/org/apache/pig/data/DistinctDataBag.java (from r610055, incubator/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java)
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DistinctDataBag.java?p2=incubator/pig/branches/types/src/org/apache/pig/data/DistinctDataBag.java&p1=incubator/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java&r1=610055&r2=614325&rev=614325&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/DistinctDataBag.java Tue Jan 22 13:17:12 2008
@@ -50,7 +50,9 @@
  * ArrayList and then sorted.  Dispite all these machinations, this was
  * found to be faster than storing it in a TreeSet.
  */
-public class DistinctDataBag extends DataBag {
+public class DistinctDataBag extends DefaultAbstractBag {
+    private static TupleFactory gTupleFactory = TupleFactory.getInstance();
+
     public DistinctDataBag() {
         mContents = new HashSet<Tuple>();
     }
@@ -246,7 +248,7 @@
 
                 // Fast foward past the tuples we've already put in the
                 // queue.
-                Tuple t = new Tuple();
+                Tuple t = gTupleFactory.newTuple();
                 for (int i = 0; i < mMemoryPtr; i++) {
                     try {
                         t.readFields(in);
@@ -357,7 +359,7 @@
             DataInputStream in = mStreams.get(fileNum);
             if (in != null) {
                 // There's still data in this file
-                c.tuple = new Tuple();
+                c.tuple = gTupleFactory.newTuple();
                 do {
                     try {
                         c.tuple.readFields(in);

Modified: incubator/pig/branches/types/src/org/apache/pig/data/IndexedTuple.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/IndexedTuple.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/IndexedTuple.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/IndexedTuple.java Tue Jan 22 13:17:12 2008
@@ -24,7 +24,7 @@
 /**
  * This is an internal class that keeps track of the specific input that a Tuple came from
  */
-public class IndexedTuple extends Tuple {
+public class IndexedTuple extends DefaultTuple {
 
 	public int index = -1;
 	
@@ -32,7 +32,9 @@
 	}
 	
 	public IndexedTuple(Tuple t, int indexIn) {
-		super(t);
+        // Have to do it like this because Tuple is an interface, we don't
+        // have access to its internal structures.
+        super(t.getAll());
 		index = indexIn;
 	}
 
@@ -45,19 +47,15 @@
 	@Override
 	public void write(DataOutput out) throws IOException {
 		super.write(out);
-		out.writeInt(index);
-		//encodeInt(out, index);
+        out.writeInt(index);
 	}
 	@Override
 	public void readFields(DataInput in) throws IOException {
 		super.readFields(in);
 		index = in.readInt();
-		//index = decodeInt(in);
 	}
 	
 	public Tuple toTuple(){
-		Tuple t = new Tuple();
-		t.mFields = mFields;
-		return t;
+        return TupleFactory.getInstance().newTuple(mFields);
 	}
 }

Copied: incubator/pig/branches/types/src/org/apache/pig/data/SortedDataBag.java (from r610055, incubator/pig/trunk/src/org/apache/pig/data/SortedDataBag.java)
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/SortedDataBag.java?p2=incubator/pig/branches/types/src/org/apache/pig/data/SortedDataBag.java&p1=incubator/pig/trunk/src/org/apache/pig/data/SortedDataBag.java&r1=610055&r2=614325&rev=614325&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/data/SortedDataBag.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/SortedDataBag.java Tue Jan 22 13:17:12 2008
@@ -48,7 +48,9 @@
  * We allow a user defined comparator, but provide a default comparator in
  * cases where the user doesn't specify one.
  */
-public class SortedDataBag extends DataBag {
+public class SortedDataBag extends DefaultAbstractBag {
+    private static TupleFactory gTupleFactory = TupleFactory.getInstance();
+
     private Comparator<Tuple> mComp;
     private boolean mReadStarted = false;
 
@@ -244,7 +246,7 @@
 
                 // Fast foward past the tuples we've already put in the
                 // queue.
-                Tuple t = new Tuple();
+                Tuple t = gTupleFactory.newTuple();
                 for (int i = 0; i < mMemoryPtr; i++) {
                     try {
                         t.readFields(in);
@@ -351,7 +353,7 @@
             DataInputStream in = mStreams.get(fileNum);
             if (in != null) {
                 // There's still data in this file
-                c.tuple = new Tuple();
+                c.tuple = gTupleFactory.newTuple();
                 try {
                     c.tuple.readFields(in);
                     mMergeQ.add(c);

Modified: incubator/pig/branches/types/src/org/apache/pig/data/TimestampedTuple.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/TimestampedTuple.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/TimestampedTuple.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/TimestampedTuple.java Tue Jan 22 13:17:12 2008
@@ -21,7 +21,7 @@
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 
-public class TimestampedTuple extends Tuple {
+public class TimestampedTuple extends DefaultTuple {
 
     protected double timestamp = 0;      // timestamp of this tuple
     protected boolean heartbeat = false;  // true iff this is a heartbeat (i.e. purpose is just to convey new timestamp; carries no data)
@@ -42,13 +42,14 @@
         super(numFields);
     }
     
+    /*
     public TimestampedTuple(String textLine, String delimiter, int timestampColumn, 
     						SimpleDateFormat dateFormat){
         if (delimiter == null) {
             delimiter = defaultDelimiter;
         }
         String[] splitString = textLine.split(delimiter, -1);
-        mFields = new ArrayList<Datum>(splitString.length-1);
+        fields = new ArrayList<Datum>(splitString.length-1);
         for (int i = 0; i < splitString.length; i++) {
         	if (i==timestampColumn){
         		try{
@@ -57,10 +58,11 @@
         			System.err.println("Could not parse timestamp " + splitString[i]);
         		}
         	}else{
-        		mFields.add(new DataAtom(splitString[i]));
+        		fields.add(new DataAtom(splitString[i]));
         	}
         }
     }
+    */
 
     
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/data/Tuple.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/Tuple.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/Tuple.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/Tuple.java Tue Jan 22 13:17:12 2008
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *	 http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,568 +17,107 @@
  */
 package org.apache.pig.data;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 
-import org.apache.log4j.Logger;
-
 import org.apache.hadoop.io.WritableComparable;
 
-import org.apache.pig.impl.util.PigLogger;
-
-/**
- * an ordered list of Data
- */
-public class Tuple extends ComplexDatum implements WritableComparable
-{
-
-static String			  defaultDelimiter = "[,\t]";
-static String			  NULL = "__PIG_NULL__";
-
-public static final byte RECORD_1  = 0x21;
-public static final byte RECORD_2  = 0x31;
-public static final byte RECORD_3  = 0x41;
-
-/**
- * Construct a tuple with no fields.
- */
-public Tuple() { this(0); }
-
-/**
- * Construct a tuple with a known number of fields.  The fields will be
- * pre-populated with nulls.
- * @param numFields Number of fields in the tuple.
- */
-public Tuple(long numFields)
-{
-	this(numFields, true);
-}
-
-/**
- * Construct a tuple with a knwn number of fields.
- * @param numFields Number of fields in the tuple.
- * @param prepopulate If true, prepopulate with nulls, otherwise leave the
- * tuple empty.  Should be called with false only by Tuple.read()
- */
-private Tuple(long numFields, boolean prepopulate)
-{
-	mFields = new ArrayList<Datum>((int)numFields);
-	if (prepopulate) {
-		for (int i = 0; i < numFields; i++) {
-			mFields.add(null);
-		}
-	}
-}
-
-/**
- * Construct a tuple from an existing List.
- */
-public Tuple(List<Datum> mFieldsIn)
-{
-	mFields = new ArrayList<Datum>(mFieldsIn.size());
-	mFields.addAll(mFieldsIn);
-}
-
-/**
- * Construct a tuple from an existing tuple.  The fields are not copied,
- * only referenced.
- */
-public Tuple(Tuple t)
-{
-	mFields = new ArrayList<Datum>(t.mFields.size());
-	mFields.addAll(t.mFields);
-}
-	
-/**
- * shortcut, if tuple only has one field
- */
-public Tuple(Datum fieldIn)
-{
-	mFields = new ArrayList<Datum>(1);
-	mFields.add(fieldIn);
-}
-
-/**
- * Creates a tuple from a delimited line of text.  For now creates elements as
- * DataAtoms.  This should change once we have expressions that can handle the
- * new types.
- * 
- * @param textLine
- *			the line containing mFields of data
- * @param delimiter
- *			a regular expression of the form specified by String.split(). If null, the default
- *			delimiter "[,\t]" will be used.
- */
-public Tuple(String textLine, String delimiter)
-{
-	if (delimiter == null) {
-		delimiter = defaultDelimiter;
-	}
-	String[] splitString = textLine.split(delimiter, -1);
-	mFields = new ArrayList<Datum>(splitString.length);
-	for (int i = 0; i < splitString.length; i++) {
-		mFields.add(new DataAtom(splitString[i]));
-	}
-}
-
-/**
- * Creates a tuple from a delimited line of text. This will invoke Tuple(textLine, null)
- * 
- * @param textLine
- *			the line containing mFields of data
- */
-public Tuple(String textLine) { this(textLine, defaultDelimiter); }
-
-public Tuple(Tuple[] otherTs)
-{
-	mFields = new ArrayList<Datum>(otherTs.length);
-	for (int i = 0; i < otherTs.length; i++) {
-			appendTuple(otherTs[i]);
-	}
-}
-
-public DataType getType() { return Datum.DataType.TUPLE; }
-
-public long size() { return mFields.size(); }
-
-public void copyFrom(Tuple otherT)
-{
-	this.mFields = otherT.mFields;
-}
-
-/**
- * @deprecated Using size instead.
- */
-public int arity() { return (int)size(); }
-
-@Override
-public String toString()
-{
-	StringBuffer sb = new StringBuffer();
-	sb.append('(');
-	for (Iterator<Datum> it = mFields.iterator(); it.hasNext();) {
-		Datum d = it.next();
-		if(d != null) {
-			sb.append(d.toString());
-		} else {
-			sb.append(NULL);
-		}
-		if (it.hasNext())
-			sb.append(", ");
-	}
-	sb.append(')');
-	String s = sb.toString();
-	return s;
-}
-
-public final void setField(long i, Datum val) throws IOException
-{
-	if (i >= mFields.size()) {
-		throw new IOException("Column number out of range, tried to access " + i + " in a tuple of only " + mFields.size() + "columns");
-	}
-	mFields.set((int)i, val);
-}
-
 /**
- * Set a field with an int value. Push it into a DataAtom for the moment,
- * eventually we'll change this to a DataInteger.
- * @param val integer value to set field to.
- */
-public final void setField(int i, int val) throws IOException
-{
-	setField(i, new DataAtom(val));
-}
-
-/**
- * Set a field with a double value. Push it into a DataAtom for the moment,
- * eventually we'll change this to a DataDouble.
- * @param val double value to set field to.
- */
-public void setField(int i, double val) throws IOException
-{
-	setField(i, new DataAtom(val));
-}
-
-/**
- * Set a field with an string value. Push it into a DataAtom for the moment,
- * eventually we'll change this to a DataCharArrayUtf16.
- * @param val string value to set field to.
- */
-public void setField(int i, String val) throws IOException
-{
-	setField(i, new DataAtom(val));
-}
-
-/**
- * Get a field from the tuple.
- * @param i Field to get
- * @return Field value, as Datum.
- */
-public final Datum getField(long i)
-{
-	Datum d = null;
-	if ((i >= mFields.size()) || ((d = mFields.get((int)i)) == null)) {
-		d = new DataUnknown();
-		d.setNull(true);
-	}
-	return d;
-}
-
-/**
- * @deprecated Use specific types instead of DataAtom.
- * Get field i, if it is an Atom or can be coerced into an Atom.
- * @param i field to get as an atom.
- * @return contents of the field. If its of type DataAtom then that will
- * be returned.  If it's of tuple of one field, then that field will be
- * returned.  If it's a bag of one element, then that element will be
- * returned.  If it's one of the new atomic types (int, etc.) it will push
- * that into a data atom and return that.
- * @throws IOException if the field isn't an atom and it can't figure out
- * how to do the coercion.
- */
-public DataAtom getAtomField(int i) throws IOException
-{
-	Datum field = getField(i); // throws exception if field doesn't exist
-
-	// This shouldn't actually ever happen anymore.
-	if (field instanceof DataAtom) return (DataAtom) field;
-
-	switch (field.getType()) {
-	case INT: return new DataAtom(((DataInteger)field).get());
-	case LONG: return new DataAtom(((DataLong)field).get());
-	case FLOAT: return new DataAtom(((DataFloat)field).get());
-	case DOUBLE: return new DataAtom(((DataDouble)field).get());
-	case UNKNOWN: return new DataAtom(((DataUnknown)field).get());
-	case CHARARRAY: 
-		switch (((DataCharArray)field).getEncoding()) {
-		case UTF16: 
-			return new DataAtom(((DataCharArrayUtf16)field).get());
-		case NONE: 
-			return new DataAtom(((DataCharArrayNone)field).get());
-		default: throw new AssertionError("Unknown encoding");
-		}
-
-	// Can't use getFieldAsAtomic for tuple and bag because these need to
-	// recurse to getAtomField instead.
-	case TUPLE: {
-		Tuple t = (Tuple) field;
-		if (t.size() == 1) {
-			PigLogger.getLogger().warn("Asked for an atom field but found a tuple with one field.");
-			return t.getAtomField(0);
-		}
-		break;
-							   }
-
-	case BAG: {
-		DataBag b = (DataBag) field;
-		if (b.bagOf() == Datum.DataType.TUPLE && b.size() == 1) {
-			Tuple t = (Tuple)b.content().next();
-			if (t.size() == 1) {
-				PigLogger.getLogger().warn("Asked for an atom field but found a bag with a tuple with one field.");
-				return t.getAtomField(0);
-			}
-		}
-		break;
-							 }
-
-	default: break;
-	}
-	throw new IOException("Incompatible type for request getAtomField().");
-}
-
-/**
- * Get field i, if it is an Atomic type or can be coerced into an Atomic
- * type.
- * @param i field to get as an atomic type.
- * @return contents of the field. If its an atomic type it will be
- * returned as is.  If it is unknown, it will be converted to a char array
- * none and returned as that.  If it's of tuple of one field, then
- * that field will be returned.  If it's a bag of one element, then
- * that element will be returned.
- * @throws IOException if the field isn't an atom and it can't figure out
- * how to do the coercion.
- */
-public AtomicDatum getFieldAsAtomic(int i) throws IOException
-{
-	Datum field = getField(i); // throws exception if field doesn't exist
-
-	if (field.getDimension() == Datum.DataDimension.ATOMIC) {
-		return (AtomicDatum)field;
-	}
-
-	switch (field.getType()) {
-	case UNKNOWN:
-		return new DataCharArrayNone(((DataUnknown)field).get());
-
-	case TUPLE: {
-		Tuple t = (Tuple) field;
-		if (t.size() == 1) {
-			PigLogger.getLogger().warn("Warning: Asked for an atom field but found a tuple with one field.");
-			return t.getFieldAsAtomic(0);
-		}
-		break;
-				}
-
-	case BAG: {
-		DataBag b = (DataBag) field;
-		if (b.bagOf() == Datum.DataType.TUPLE && b.size() == 1) {
-			Tuple t = (Tuple)b.content().next();
-			if (t.size() == 1) {
-				PigLogger.getLogger().warn("Warning: Asked for an atom field but found a bag with a tuple with one field.");
-				return t.getFieldAsAtomic(0);
-			}
-		}
-		break;
-			  }
-
-	default: break;
-	}
-
-	throw new IOException("Incompatible type for request getAtomField().");
-}
-
-/**
- * Attempt to fetch a field as a tuple.
- * @param i field number to get.
- * @return If the field is a tuple, return it.  If it's bag of one tuple,
- * return it.  Otherwise... 
- * @throws IOException if the field is neither a tuple nor a bag of one
- * tuple.
- */
-public Tuple getTupleField(int i) throws IOException
-{
-	Datum field = getField(i); // throws exception if field doesn't exist
-
-	if (field.getType() == Datum.DataType.TUPLE) {
-		return (Tuple) field;
-	} else if (field.getType() == Datum.DataType.BAG) {
-		DataBag b = (DataBag) field;
-		if (b.bagOf() == Datum.DataType.TUPLE && b.size() == 1) {
-			return (Tuple)b.content().next();
-		}
-	}
-
-	throw new IOException("Incompatible type for request getTupleField().");
-}
-
-/**
- * Attempt to fetch a field as a bag.
- * @param i field number to get.
- * @return If the field is a bag, return it. Otherwise... 
- * @throws IOException if the field is not a bag.
- */
-public DataBag getBagField(int i) throws IOException
-{
-	Datum field = getField(i); // throws exception if field doesn't exist
-
-	if (field.getType() == Datum.DataType.BAG) return (DataBag) field;
-
-	throw new IOException("Incompatible type for request getBagField().");
-}
-
-public final void appendTuple(Tuple other)
-{
-	for (Iterator<Datum> it = other.mFields.iterator(); it.hasNext();) {
-		mFields.add(it.next());
-	}
-}
-
-public final void appendField(Datum newField) { mFields.add(newField); }
-
-public String toDelimitedString(String delim) throws IOException
-{
-	StringBuffer buf = new StringBuffer();
-	for (Iterator<Datum> it = mFields.iterator(); it.hasNext();) {
-		Datum field = it.next();
-		if (field.getDimension() == Datum.DataDimension.COMPLEX) {
-			throw new IOException("Unable to convert non-flat tuple to string.");
-		}
-
-		buf.append(field.toString());
-		if (it.hasNext()) buf.append(delim);
-	}
-	return buf.toString();
-}
-
-/*
-	public boolean lessThan(Tuple other) {
-		return (this.compareTo(other) < 0);
-	}
-
-	public boolean greaterThan(Tuple other) {
-		return (this.compareTo(other) > 0);
-	}
-	
-	*/
-
-/**
- * See if two tuples are equal to each other.  Tuple equality is defined as being
- * of the same size, and for each field f1...fn in t1 and fields g1...gn in t2,
- * f1.equals(g1) ... fn.equals(gn) holds true.
- * Don't make this use compareTo.  These functions are used in things like hashs
- * and we want them to be as fast as possible.
- */
-@Override
-public boolean equals(Object other)
-{
-	if (!(other instanceof Tuple)) return false;
-
-	Tuple t = (Tuple)other;
-
-	long sz = size();
-
-	if (t.size() != sz) return false;
-
-	for (long i = 0; i < sz; i++) {
-		if (!t.getField(i).equals(getField(i))) return false;
-	}
-
-	return true;
-}
-
-public int compareTo(Object other)
-{
-	if (!(other instanceof Datum)) return -1;
-
-	Datum od = (Datum)other;
-
-	if (od.getType() != Datum.DataType.TUPLE) return crossTypeCompare(od);
-
-	Tuple t = (Tuple)od;
-
-	long sz = size();
-	long tsz = t.size();
-	if (sz < tsz) return -1;
-	else if (sz > tsz) return 1;
-
-	for (long i = 0; i < sz; i++) {
-		int c = mFields.get((int)i).compareTo(t.mFields.get((int)i));
-		if (c != 0) return c;
-	}
-	return 0;
-}
-
-@Override
-public int hashCode()
-{
-	int hash = 1;
-	for (Iterator<Datum> it = mFields.iterator(); it.hasNext();) {
-		Datum f = it.next();
-		if (f == null) hash += 1;
-		else hash = 31 * hash + f.hashCode();
-	}
-	return hash;
-}
-
-// WritableComparable methods:
-   
-@Override
-public void write(DataOutput out) throws IOException
-{
-	out.write(Datum.DataType.TUPLE.getMarker());
-	long n = size();
-	out.writeLong(n);
-	for (long i = 0; i < n; i++) {
-		Datum d = getField((int)i);
-		if (d != null){
-			d.write(out);
-		} else {
-			throw new RuntimeException("Null field in tuple");
-		}
-	}
-}
-
-/**
- * This method is invoked when the beginning 'TUPLE' is still on the stream.
- * @param in DataInput to read from
- * @throws IOExcetion if the expected data isn't a tuple.
+ * An ordered list of Data.  A tuple has fields, numbered 0 through
+ * (number of fields - 1).  The entry in the field can be any datatype,
+ * or it can be null.
+ *
+ * Tuples are constructed only by a TupleFactory.  A DefaultTupleFactory
+ * is provided by the system.  If a user wishes to use their own type of
+ * Tuple, they should also provide an implementation of TupleFactory to
+ * construct their types of Tuples.
+ *
+ * Fields are numbered from 0.
  */
-public void readFields(DataInput in) throws IOException
-{
-	byte[] b = new byte[1];
-	in.readFully(b);
-	if (b[0] != Datum.DataType.TUPLE.getMarker())
-		throw new IOException("Unexpected data while reading tuple from binary file");
-	Tuple t = read(in);
-	mFields = t.mFields;
-}
-	
-//This method is invoked when the beginning 'TUPLE' has been read off the stream
-public static Tuple read(DataInput in) throws IOException
-{
-	long size = in.readLong();
-
-	// nuke the old contents of the tuple
-	Tuple ret = new Tuple(size, false);
-
-	for (int i = 0; i < size; i++) {
-		ret.appendField(DatumImpl.readDatum(in));
-	}
-		
-	return ret;
-}
-	
-	/*
-public static Datum readDatum(DataInput in) throws IOException
-{
-	byte[] b = new byte[1];
-	in.readFully(b);
-	switch (b[0]) {
-	case TUPLE:
-		return Tuple.read(in);
-	case BAG:
-		return DataBag.read(in);
-	case MAP:
-		return DataMap.read(in);
-	case INT:
-		return DataInt.read(in);
-	case LONG:
-		return DataLong.read(in);
-	case FLOAT:
-		return DataFloat.read(in);
-	case DOUBLE:
-		return DataDouble.read(in);
-	case UNKNOWN:
-		return DataUnknown.read(in);
-	default:
-		throw new AssertionError("Invalid data type indicator " + b[0] + " while reading Datum from binary file");
-	}
-}
-
-	//  Encode the integer so that the high bit is set on the last
-	// byte
-	static void encodeInt(DataOutput os, int i) throws IOException {
-		if (i >> 28 != 0)
-			os.write((i >> 28) & 0x7f);
-		if (i >> 21 != 0)
-			os.write((i >> 21) & 0x7f);
-		if (i >> 14 != 0)
-			os.write((i >> 14) & 0x7f);
-		if (i >> 7 != 0)
-			os.write((i >> 7) & 0x7f);
-		os.write((i & 0x7f) | (1 << 7));
-	}
-
-	static int decodeInt(DataInput is) throws IOException {
-		int i = 0;
-		int c;
-		while (true) {
-			c = is.readUnsignedByte();
-			if (c == -1)
-				break;
-			i <<= 7;
-			i += c & 0x7f;
-			if ((c & 0x80) != 0)
-				break;
-		}
-		return i;
-	}
-	*/
-
-protected ArrayList<Datum> mFields;
-
+public interface Tuple extends WritableComparable {
+    /**
+     * Make this tuple reference the contents of another.  This method does not copy
+     * the underlying data.   It maintains references to the data from the original
+     * tuple (and possibly even to the data structure holding the data).
+     * @param t Tuple to reference.
+     */
+    void reference(Tuple t);
+
+    /**
+     * Find the size of the tuple.  Used to be called arity().
+     * @return number of fields in the tuple.
+     */
+    int size();
+
+    /**
+     * Find out if a given field is null.
+     * @param fieldNum Number of field to check for null.
+     * @return true if the field is null, false otherwise.
+     * @throws IOException if the field number given is greater
+     * than or equal to the number of fields in the tuple.
+     */
+    boolean isNull(int fieldNum) throws IOException;
+
+    /**
+     * Find the type of a given field.
+     * @param fieldNum Number of field to get the type for.
+     * @return type, encoded as a byte value.  The values are taken from
+     * the class DataType.  If the field is null, then DataType.UNKNOWN
+     * will be returned.
+     * @throws IOException if the field number is greater than or equal to
+     * the number of fields in the tuple.
+     */
+    byte getType(int fieldNum) throws IOException;
+
+    /**
+     * Get the value in a given field.
+     * @param fieldNum Number of the field to get the value for.
+     * @return value, as an Object.
+     * @throws IOException if the field number is greater than or equal to
+     * the number of fields in the tuple.
+     */
+    Object get(int fieldNum) throws IOException;
+
+    /**
+     * Get all of the fields in the tuple as a list.
+     * @return List&lt;Object&gt; containing the fields of the tuple
+     * in order.
+     */
+    List<Object> getAll();
+
+    /**
+     * Set the value in a given field.
+     * @param fieldNum Number of the field to set the value for.
+     * @param val Object to put in the indicated field.
+     * @throws IOException if the field number is greater than or equal to
+     * the number of fields in the tuple.
+     */
+    void set(int fieldNum, Object val) throws IOException;
+
+    /**
+     * Append a field to a tuple.  This method is not efficient as it may
+     * force copying of existing data in order to grow the data structure.
+     * Whenever possible you should construct your Tuple with the
+     * newTuple(int) method and then fill in the values with set(), rather
+     * than construct it with newTuple() and append values.
+     * @param val Object to append to the tuple.
+     */
+    void append(Object val);
+
+    /**
+     * Determine the size of tuple in memory.  This is used by data bags
+     * to determine their memory size.  This need not be exact, but it
+     * should be a decent estimation.
+     * @return estimated memory size.
+     */
+    long getMemorySize();
+
+    /** 
+     * Write a tuple of atomic values into a string.  All values in the
+     * tuple must be atomic (no bags, tuples, or maps).
+     * @param delim Delimiter to use in the string.
+     * @return A string containing the tuple.
+     * @throws IOException if a non-atomic value is found.
+     */
+    String toDelimitedString(String delim) throws IOException;
 }

Added: incubator/pig/branches/types/src/org/apache/pig/data/TupleFactory.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/TupleFactory.java?rev=614325&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/TupleFactory.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/data/TupleFactory.java Tue Jan 22 13:17:12 2008
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.lang.Class;
+import java.lang.ClassLoader;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.List;
+
+/**
+ * A factory to construct tuples.  This class is abstract so that users can
+ * override the tuple factory if they desire to provide their own that
+ * returns their implementation of a tuple.  If the property
+ * pig.data.tuple.factory.name is set to a class name and
+ * pig.data.tuple.factory.jar is set to a URL pointing to a jar that
+ * contains the above named class, then getInstance() will create a
+ * a instance of the named class using the indicatd jar.  Otherwise, it
+ * will create and instance of DefaultTupleFactory.
+ */
+public abstract class TupleFactory {
+    private static TupleFactory gSelf = null;
+
+    /**
+     * Get a reference to the singleton factory.
+     */
+    public static TupleFactory getInstance() {
+        if (gSelf == null) {
+            String factoryName =
+                System.getProperty("pig.data.tuple.factory.name");
+            String factoryJar =
+                System.getProperty("pig.data.tuple.factory.jar");
+            if (factoryName != null && factoryJar != null) {
+                try {
+                    URL[] urls = new URL[1];
+                    urls[0] = new URL(factoryJar);
+                    ClassLoader loader = new URLClassLoader(urls,
+                        TupleFactory.class.getClassLoader());
+                    Class c = Class.forName(factoryName, true, loader);
+                    Object o = c.newInstance();
+                    if (!(o instanceof TupleFactory)) {
+                        throw new RuntimeException("Provided factory " +
+                            factoryName + " does not extend TupleFactory!");
+                    }
+                    gSelf = (TupleFactory)o;
+                } catch (Exception e) {
+                    if (e instanceof RuntimeException) {
+                        // We just threw this
+                        RuntimeException re = (RuntimeException)e;
+                        throw re;
+                    }
+                    throw new RuntimeException("Unable to instantiate "
+                        + "tuple factory " + factoryName, e);
+                }
+            } else {
+                gSelf = new DefaultTupleFactory();
+            }
+        }
+        return gSelf;
+    }
+    
+    /**
+     * Create an empty tuple.  This should be used as infrequently as
+     * possible, use newTuple(int) instead.
+     */
+    public abstract Tuple newTuple();
+
+    /**
+     * Create a tuple with size fields.  Whenever possible this is prefered
+     * over the nullary constructor, as the constructor can preallocate the
+     * size of the container holding the fields.
+     * @param size Number of fields in the tuple.
+     */
+    public abstract Tuple newTuple(int size);
+    
+    /**
+     * Create a tuple from the provided list of objects.
+     * @param c List of objects to use as the fields of the tuple.
+     */
+    public abstract Tuple newTuple(List c);
+
+    /**
+     * Create a tuple with a single element.  This is useful because of
+     * the fact that bags (currently) only take tuples, we often end up
+     * sticking a single element in a tuple in order to put it in a bag.
+     * @param datum Datum to put in the tuple.
+     */
+    public abstract Tuple newTuple(Object datum);
+
+    protected TupleFactory() {
+    }
+
+    /**
+     * Provided for testing purposes only.  This function should never be
+     * called by anybody but the unit tests.
+     */
+    public static void resetSelf() {
+        gSelf = null;
+    }
+
+}
+

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/PigContext.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/PigContext.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/PigContext.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/PigContext.java Tue Jan 22 13:17:12 2008
@@ -19,6 +19,7 @@
 
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.File;
 import java.io.InputStream;
 import java.io.Serializable;
 import java.lang.reflect.Constructor;
@@ -42,6 +43,7 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.mapred.JobSubmissionProtocol;
 import org.apache.hadoop.mapred.JobTracker;
@@ -75,7 +77,7 @@
     transient private JobConf conf = null;        
     
     //  extra jar files that are needed to run a job
-    transient public List<String> extraJars = new LinkedList<String>();              
+    transient public List<URL> extraJars = new LinkedList<URL>();              
     
     //  The jars that should not be merged in. (Some functions may come from pig.jar and we don't want the whole jar file.)
     transient public Vector<String> skipJars = new Vector<String>(2);    
@@ -89,7 +91,6 @@
     //  connection to hadoop jobtracker (stays as null if doing local execution)
     transient private JobSubmissionProtocol jobTracker;                  
     transient private JobClient jobClient;                  
-	transient private Logger                mLogger;
    
     private String jobName = JOB_NAME_PREFIX;	// can be overwritten by users
   
@@ -109,12 +110,6 @@
 	public PigContext(ExecType execType){
 		this.execType = execType;
 		
-		/*
-		mLogger = Logger.getLogger("org.apache.pig");
-		mLogger.setAdditivity(false);
-		*/
-		mLogger = PigLogger.getLogger();
-
     	initProperties();
     	
         String pigJar = JarManager.findContainingJar(Main.class);
@@ -134,6 +129,8 @@
     }
 
 	private void initProperties() {
+        Logger log = PigLogger.getLogger();
+
 	    Properties fileProperties = new Properties();
 	        
 	    try{        
@@ -158,15 +155,16 @@
 	    //Now set these as system properties only if they are not already defined.
 	    for (Object o: fileProperties.keySet()){
 	    	String propertyName = (String)o;
-			mLogger.debug("Found system property " + propertyName + " in .pigrc"); 
+			log.debug("Found system property " + propertyName + " in .pigrc"); 
 	    	if (System.getProperty(propertyName) == null){
 	    		System.setProperty(propertyName, fileProperties.getProperty(propertyName));
-				mLogger.debug("Setting system property " + propertyName);
+				log.debug("Setting system property " + propertyName);
 	    	}
 	    }
 	}    
 	
     public void connect(){
+        Logger log = PigLogger.getLogger();
     	try{
 		if (execType != ExecType.LOCAL){
 		    	//First set the ssh socket factory
@@ -202,10 +200,10 @@
 		     
 	            lfs = FileSystem.getNamed("local", conf);
 	       
-	            mLogger.info("Connecting to hadoop file system at: " + conf.get("fs.default.name"));
+	            log.info("Connecting to hadoop file system at: " + conf.get("fs.default.name"));
 	            dfs = FileSystem.get(conf);
 	        
-	            mLogger.info("Connecting to map-reduce job tracker at: " + conf.get("mapred.job.tracker"));
+	            log.info("Connecting to map-reduce job tracker at: " + conf.get("mapred.job.tracker"));
 	            jobTracker = (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
 	            				JobSubmissionProtocol.versionID, JobTracker.getAddress(conf), conf);
 		    jobClient = new JobClient(conf);
@@ -236,6 +234,7 @@
     };
     
     private String[] doHod(String server) {
+        Logger log = PigLogger.getLogger();
     	if (hodMapRed != null) {
     		return new String[] {hodHDFS, hodMapRed};
     	}
@@ -253,7 +252,10 @@
             cmd.append(System.getProperty("hod.command"));
             //String cmd = System.getProperty("hod.command", "/home/breed/startHOD.expect");
 			String cluster = System.getProperty("yinst.cluster");
-			if (cluster.length() > 0 && !cluster.startsWith("kryptonite")) {
+            // TODO This is a Yahoo specific holdover, need to remove
+            // this.
+			if (cluster != null && cluster.length() > 0 &&
+                    !cluster.startsWith("kryptonite")) {
 				cmd.append(" --config=");
 				cmd.append(System.getProperty("hod.config.dir"));
 				cmd.append('/');
@@ -267,8 +269,8 @@
             	p = fac.ssh(cmd.toString());
             }
             InputStream is = p.getInputStream();
-            mLogger.info("Connecting to HOD...");
-			mLogger.debug("sending HOD command " + cmd.toString());
+            log.info("Connecting to HOD...");
+			log.debug("sending HOD command " + cmd.toString());
             StringBuffer sb = new StringBuffer();
             int c;
             String hdfsUI = null;
@@ -282,23 +284,23 @@
                 	switch(current) {
                 	case HDFSUI:
                 		hdfsUI = sb.toString().trim();
-                		mLogger.info("HDFS Web UI: " + hdfsUI);
+                		log.info("HDFS Web UI: " + hdfsUI);
                 		break;
                 	case HDFS:
                 		hdfs = sb.toString().trim();
-                		mLogger.info("HDFS: " + hdfs);
+                		log.info("HDFS: " + hdfs);
                 		break;
                 	case MAPREDUI:
                 		mapredUI = sb.toString().trim();
-                		mLogger.info("JobTracker Web UI: " + mapredUI);
+                		log.info("JobTracker Web UI: " + mapredUI);
                 		break;
                 	case MAPRED:
                 		mapred = sb.toString().trim();
-                		mLogger.info("JobTracker: " + mapred);
+                		log.info("JobTracker: " + mapred);
                 		break;
 			case HADOOPCONF:
 				hadoopConf = sb.toString().trim();
-                		mLogger.info("HadoopConf: " + hadoopConf);
+                		log.info("HadoopConf: " + hadoopConf);
                 		break;
                 	}
                 	current = ParsingState.NOTHING;
@@ -334,7 +336,7 @@
 	    {
 		conf = new JobConf(hadoopConf);
 		// make sure that files on class path are used
-		conf.addFinalResource("pig-cluster-hadoop-site.xml");
+		conf.addResource("pig-cluster-hadoop-site.xml");
 		System.out.println("Job Conf = " + conf);
 		System.out.println("dfs.block.size= " + conf.get("dfs.block.size"));
 		System.out.println("ipc.client.timeout= " + conf.get("ipc.client.timeout"));
@@ -344,7 +346,7 @@
 		throw new IOException("Missing Hadoop configuration file");
             return new String[] {hdfs, mapred};
         } catch (Exception e) {
-            mLogger.fatal("Could not connect to HOD", e);
+            log.fatal("Could not connect to HOD", e);
             System.exit(4);
         }
         throw new RuntimeException("Could not scrape needed information.");
@@ -372,9 +374,18 @@
 		}
     }
     
-    public void addJar(String path) throws MalformedURLException{
-        extraJars.add(path);
-        LogicalPlanBuilder.classloader = createCl(null);
+    public void addJar(String path) throws MalformedURLException {
+        if (path != null) {
+            URL resource = (new File(path)).toURI().toURL();
+            addJar(resource);
+        }
+    }
+    
+    public void addJar(URL resource) throws MalformedURLException{
+        if (resource != null) {
+            extraJars.add(resource);
+            LogicalPlanBuilder.classloader = createCl(null);
+        }
     }
 
     public void rename(String oldName, String newName) throws IOException {
@@ -410,12 +421,6 @@
         return conf;
     }
 
-	/*
-	public Logger getLogger() {
-		return mLogger;
-	}
-	*/
-
     public void setJobtrackerLocation(String newLocation) {
         conf.set("mapred.job.tracker", newLocation);
     }
@@ -471,7 +476,7 @@
             urls[0] = new URL("file:" + jarFile);
         }
         for (int i = 0; i < extraJars.size(); i++) {
-            urls[i + passedJar] = new URL("file:" + extraJars.get(i));
+            urls[i + passedJar] = extraJars.get(i);
         }
         return new URLClassLoader(urls, PigMapReduce.class.getClassLoader());
     }
@@ -506,7 +511,9 @@
 		// create ClassNotFoundException exception and attach to IOException
 		// so that we don't need to buble interface changes throughout the code
 		ClassNotFoundException e = new ClassNotFoundException("Could not resolve " + name + " using imports: " + packageImportList);
-		throw new IOException(e);
+		IOException newE = new IOException(e.getMessage());
+                newE.initCause(e);
+                throw newE;
     }
     
     private static List<String> parseArguments(String argString){
@@ -548,7 +555,9 @@
 				ret = objClass.newInstance();
 			}
     	}catch(Throwable e){
-    		throw new IOException(e);
+    		IOException newE = new IOException(e.getMessage());
+                newE.initCause(e);
+                throw newE;
     	}
 		return ret;
 	}

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/builtin/ADD.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/builtin/ADD.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/builtin/ADD.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/builtin/ADD.java Tue Jan 22 13:17:12 2008
@@ -20,20 +20,19 @@
 import java.io.IOException;
 
 import org.apache.pig.EvalFunc;
-import org.apache.pig.data.DataAtom;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.logicalLayer.schema.AtomSchema;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 
 
-public class ADD extends EvalFunc<DataAtom> {
+public class ADD extends EvalFunc<Double> {
 
     @Override
-    public void exec(Tuple input, DataAtom output) throws IOException {
-        double v1 = input.getAtomField(0).numval();
-        double v2 = input.getAtomField(1).numval();
-        
-        output.setValue(v1+v2);
+    public Double exec(Tuple input) throws IOException {
+        double v1 = (Double)input.get(0);
+        double v2 = (Double)input.get(1);
+
+        return new Double(v1 + v2);
     }
     
     @Override

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/builtin/DIVIDE.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/builtin/DIVIDE.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/builtin/DIVIDE.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/builtin/DIVIDE.java Tue Jan 22 13:17:12 2008
@@ -20,17 +20,16 @@
 import java.io.IOException;
 
 import org.apache.pig.EvalFunc;
-import org.apache.pig.data.DataAtom;
 import org.apache.pig.data.Tuple;
 
 
-public class DIVIDE extends EvalFunc<DataAtom> {
+public class DIVIDE extends EvalFunc<Double> {
 
     @Override
-    public void exec(Tuple input, DataAtom output) throws IOException {
-        double v1 = input.getAtomField(0).numval();
-        double v2 = input.getAtomField(1).numval();
-        output.setValue(v1/v2);
+    public Double exec(Tuple input) throws IOException {
+        double v1 = (Double)input.get(0);
+        double v2 = (Double)input.get(1);
+        return new Double(v1/v2);
     }
 
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/builtin/FindQuantiles.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/builtin/FindQuantiles.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/builtin/FindQuantiles.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/builtin/FindQuantiles.java Tue Jan 22 13:17:12 2008
@@ -21,36 +21,39 @@
 import java.util.Iterator;
 
 import org.apache.pig.EvalFunc;
+import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.data.Datum;
 
 
 public class FindQuantiles extends EvalFunc<DataBag>{
-	
-	/**
-	 * first field in the input tuple is the number of quantiles to generate
-	 * second field is the *sorted* bag of samples
-	 */
-	
-	@Override
-	public void exec(Tuple input, DataBag output) throws IOException {
-		int numQuantiles = input.getAtomField(0).numval().intValue();
-		DataBag samples = input.getBagField(1);
-		
-		int numSamples = samples.cardinality();
-		
-		int toSkip = numSamples / numQuantiles;
-		
-		int i=0, nextQuantile = 0;
-		Iterator<Datum> iter = samples.content();
-		while (iter.hasNext()){
-			Tuple t = (Tuple)iter.next();
-			if (i==nextQuantile){
-				output.add(t);
-				nextQuantile+=toSkip+1;
-			}
-			i++;
-		}
-	}
+    BagFactory mBagFactory = BagFactory.getInstance();
+    
+    /**
+     * first field in the input tuple is the number of quantiles to generate
+     * second field is the *sorted* bag of samples
+     */
+    
+    @Override
+    public DataBag exec(Tuple input) throws IOException {
+        Integer numQuantiles = (Integer)input.get(0);
+        DataBag samples = (DataBag)input.get(1);
+        DataBag output = mBagFactory.newDefaultBag();
+        
+        long numSamples = samples.size();
+        
+        long toSkip = numSamples / numQuantiles;
+        
+        long i=0, nextQuantile = 0;
+        Iterator<Tuple> iter = samples.iterator();
+        while (iter.hasNext()){
+            Tuple t = iter.next();
+            if (i==nextQuantile){
+                output.add(t);
+                nextQuantile+=toSkip+1;
+            }
+            i++;
+        }
+        return output;
+    }
 }