You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/01/22 22:17:22 UTC

svn commit: r614325 [1/6] - in /incubator/pig/branches/types: ./ lib/ scripts/ src/org/apache/pig/ src/org/apache/pig/builtin/ src/org/apache/pig/data/ src/org/apache/pig/impl/ src/org/apache/pig/impl/builtin/ src/org/apache/pig/impl/eval/ src/org/apac...

Author: gates
Date: Tue Jan 22 13:17:12 2008
New Revision: 614325

URL: http://svn.apache.org/viewvc?rev=614325&view=rev
Log:
Beginning of rework to add new types using java native objects instead of pig
defined classes.  This code doesn't work yet.  pig.jar compiles, but the
tests don't yet.


Added:
    incubator/pig/branches/types/CHANGES.txt
      - copied unchanged from r610055, incubator/pig/trunk/CHANGES.txt
    incubator/pig/branches/types/lib/hadoop15.jar
      - copied unchanged from r610055, incubator/pig/trunk/lib/hadoop15.jar
    incubator/pig/branches/types/src/org/apache/pig/ComparisonFunc.java
      - copied unchanged from r610055, incubator/pig/trunk/src/org/apache/pig/ComparisonFunc.java
    incubator/pig/branches/types/src/org/apache/pig/data/DataType.java
    incubator/pig/branches/types/src/org/apache/pig/data/DefaultAbstractBag.java
    incubator/pig/branches/types/src/org/apache/pig/data/DefaultBagFactory.java
    incubator/pig/branches/types/src/org/apache/pig/data/DefaultDataBag.java
      - copied, changed from r610055, incubator/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java
    incubator/pig/branches/types/src/org/apache/pig/data/DefaultTupleFactory.java
    incubator/pig/branches/types/src/org/apache/pig/data/DistinctDataBag.java
      - copied, changed from r610055, incubator/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java
    incubator/pig/branches/types/src/org/apache/pig/data/SortedDataBag.java
      - copied, changed from r610055, incubator/pig/trunk/src/org/apache/pig/data/SortedDataBag.java
    incubator/pig/branches/types/src/org/apache/pig/data/TupleFactory.java
    incubator/pig/branches/types/src/org/apache/pig/impl/eval/EvalSpecPrinter.java
      - copied unchanged from r610055, incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpecPrinter.java
    incubator/pig/branches/types/src/org/apache/pig/impl/eval/EvalSpecVisitor.java
      - copied unchanged from r610055, incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpecVisitor.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOPrinter.java
      - copied unchanged from r610055, incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOPrinter.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOVisitor.java
      - copied unchanged from r610055, incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOVisitor.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POPrinter.java
      - copied unchanged from r610055, incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POPrinter.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POVisitor.java
      - copied unchanged from r610055, incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POVisitor.java
    incubator/pig/branches/types/src/org/apache/pig/impl/util/Spillable.java
      - copied unchanged from r610055, incubator/pig/trunk/src/org/apache/pig/impl/util/Spillable.java
    incubator/pig/branches/types/src/org/apache/pig/impl/util/SpillableMemoryManager.java
      - copied unchanged from r610055, incubator/pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java
    incubator/pig/branches/types/src/org/apache/pig/tools/grunt/GruntParser.java
      - copied unchanged from r610055, incubator/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
    incubator/pig/branches/types/src/org/apache/pig/tools/pigscript/
      - copied from r610055, incubator/pig/trunk/src/org/apache/pig/tools/pigscript/
    incubator/pig/branches/types/src/org/apache/pig/tools/pigscript/parser/   (props changed)
      - copied from r610055, incubator/pig/trunk/src/org/apache/pig/tools/pigscript/parser/
    incubator/pig/branches/types/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
      - copied unchanged from r610055, incubator/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
    incubator/pig/branches/types/test/org/apache/pig/test/NonDefaultBagFactory.java
    incubator/pig/branches/types/test/org/apache/pig/test/OrdAsc.java
      - copied unchanged from r610055, incubator/pig/trunk/test/org/apache/pig/test/OrdAsc.java
    incubator/pig/branches/types/test/org/apache/pig/test/OrdDesc.java
      - copied unchanged from r610055, incubator/pig/trunk/test/org/apache/pig/test/OrdDesc.java
    incubator/pig/branches/types/test/org/apache/pig/test/OrdDescNumeric.java
      - copied unchanged from r610055, incubator/pig/trunk/test/org/apache/pig/test/OrdDescNumeric.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestOrderBy.java
      - copied unchanged from r610055, incubator/pig/trunk/test/org/apache/pig/test/TestOrderBy.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestPigServer.java
      - copied unchanged from r610055, incubator/pig/trunk/test/org/apache/pig/test/TestPigServer.java
Removed:
    incubator/pig/branches/types/lib/hadoop13.jar
    incubator/pig/branches/types/src/org/apache/pig/data/BigDataBag.java
    incubator/pig/branches/types/src/org/apache/pig/data/DataAtom.java
    incubator/pig/branches/types/src/org/apache/pig/data/DataCharArray.java
    incubator/pig/branches/types/src/org/apache/pig/data/DataCharArrayNone.java
    incubator/pig/branches/types/src/org/apache/pig/data/DataCharArrayUtf16.java
    incubator/pig/branches/types/src/org/apache/pig/data/DataDouble.java
    incubator/pig/branches/types/src/org/apache/pig/data/DataFloat.java
    incubator/pig/branches/types/src/org/apache/pig/data/DataInteger.java
    incubator/pig/branches/types/src/org/apache/pig/data/DataLong.java
    incubator/pig/branches/types/src/org/apache/pig/data/DataMap.java
    incubator/pig/branches/types/src/org/apache/pig/data/Datum.java
    incubator/pig/branches/types/src/org/apache/pig/data/DatumImpl.java
    incubator/pig/branches/types/src/org/apache/pig/tools/grunt/GruntParser.jj
    incubator/pig/branches/types/test/com/
    incubator/pig/branches/types/test/org/apache/pig/test/TestDataCharArrayNone.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestDataCharArrayUtf16.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestDataDouble.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestDataFloat.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestDataInteger.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestDataLong.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestDataMap.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestDataUnknown.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestTuple.java
    incubator/pig/branches/types/test/reports/
Modified:
    incubator/pig/branches/types/   (props changed)
    incubator/pig/branches/types/build.xml
    incubator/pig/branches/types/scripts/pig.pl
    incubator/pig/branches/types/src/org/apache/pig/EvalFunc.java
    incubator/pig/branches/types/src/org/apache/pig/Main.java
    incubator/pig/branches/types/src/org/apache/pig/PigServer.java
    incubator/pig/branches/types/src/org/apache/pig/builtin/ARITY.java
    incubator/pig/branches/types/src/org/apache/pig/builtin/AVG.java
    incubator/pig/branches/types/src/org/apache/pig/builtin/BinStorage.java
    incubator/pig/branches/types/src/org/apache/pig/builtin/COUNT.java
    incubator/pig/branches/types/src/org/apache/pig/builtin/DIFF.java
    incubator/pig/branches/types/src/org/apache/pig/builtin/IsEmpty.java
    incubator/pig/branches/types/src/org/apache/pig/builtin/MAX.java
    incubator/pig/branches/types/src/org/apache/pig/builtin/MIN.java
    incubator/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java
    incubator/pig/branches/types/src/org/apache/pig/builtin/SUM.java
    incubator/pig/branches/types/src/org/apache/pig/builtin/TOKENIZE.java
    incubator/pig/branches/types/src/org/apache/pig/builtin/TextLoader.java
    incubator/pig/branches/types/src/org/apache/pig/data/AmendableTuple.java
    incubator/pig/branches/types/src/org/apache/pig/data/BagFactory.java
    incubator/pig/branches/types/src/org/apache/pig/data/DataBag.java
    incubator/pig/branches/types/src/org/apache/pig/data/IndexedTuple.java
    incubator/pig/branches/types/src/org/apache/pig/data/TimestampedTuple.java
    incubator/pig/branches/types/src/org/apache/pig/data/Tuple.java
    incubator/pig/branches/types/src/org/apache/pig/impl/PigContext.java
    incubator/pig/branches/types/src/org/apache/pig/impl/builtin/ADD.java
    incubator/pig/branches/types/src/org/apache/pig/impl/builtin/DIVIDE.java
    incubator/pig/branches/types/src/org/apache/pig/impl/builtin/FindQuantiles.java
    incubator/pig/branches/types/src/org/apache/pig/impl/builtin/GFAny.java
    incubator/pig/branches/types/src/org/apache/pig/impl/builtin/GFCross.java
    incubator/pig/branches/types/src/org/apache/pig/impl/builtin/GFReplicate.java
    incubator/pig/branches/types/src/org/apache/pig/impl/builtin/MULTIPLY.java
    incubator/pig/branches/types/src/org/apache/pig/impl/builtin/SUBTRACT.java
    incubator/pig/branches/types/src/org/apache/pig/impl/builtin/ShellBagEvalFunc.java
    incubator/pig/branches/types/src/org/apache/pig/impl/eval/BinCondSpec.java
    incubator/pig/branches/types/src/org/apache/pig/impl/eval/CompositeEvalSpec.java
    incubator/pig/branches/types/src/org/apache/pig/impl/eval/ConstSpec.java
    incubator/pig/branches/types/src/org/apache/pig/impl/eval/EvalSpec.java
    incubator/pig/branches/types/src/org/apache/pig/impl/eval/FilterSpec.java
    incubator/pig/branches/types/src/org/apache/pig/impl/eval/FuncEvalSpec.java
    incubator/pig/branches/types/src/org/apache/pig/impl/eval/GenerateSpec.java
    incubator/pig/branches/types/src/org/apache/pig/impl/eval/MapLookupSpec.java
    incubator/pig/branches/types/src/org/apache/pig/impl/eval/ProjectSpec.java
    incubator/pig/branches/types/src/org/apache/pig/impl/eval/SimpleEvalSpec.java
    incubator/pig/branches/types/src/org/apache/pig/impl/eval/SortDistinctSpec.java
    incubator/pig/branches/types/src/org/apache/pig/impl/eval/StarSpec.java
    incubator/pig/branches/types/src/org/apache/pig/impl/eval/collector/DataCollector.java
    incubator/pig/branches/types/src/org/apache/pig/impl/eval/collector/FlattenCollector.java
    incubator/pig/branches/types/src/org/apache/pig/impl/eval/collector/UnflattenCollector.java
    incubator/pig/branches/types/src/org/apache/pig/impl/eval/cond/AndCond.java
    incubator/pig/branches/types/src/org/apache/pig/impl/eval/cond/CompCond.java
    incubator/pig/branches/types/src/org/apache/pig/impl/eval/cond/Cond.java
    incubator/pig/branches/types/src/org/apache/pig/impl/eval/cond/FalseCond.java
    incubator/pig/branches/types/src/org/apache/pig/impl/eval/cond/FuncCond.java
    incubator/pig/branches/types/src/org/apache/pig/impl/eval/cond/NotCond.java
    incubator/pig/branches/types/src/org/apache/pig/impl/eval/cond/OrCond.java
    incubator/pig/branches/types/src/org/apache/pig/impl/eval/cond/RegexpCond.java
    incubator/pig/branches/types/src/org/apache/pig/impl/eval/cond/TrueCond.java
    incubator/pig/branches/types/src/org/apache/pig/impl/eval/window/TimeWindowSpec.java
    incubator/pig/branches/types/src/org/apache/pig/impl/eval/window/TupleWindowSpec.java
    incubator/pig/branches/types/src/org/apache/pig/impl/eval/window/WindowSpec.java
    incubator/pig/branches/types/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java
    incubator/pig/branches/types/src/org/apache/pig/impl/io/DataBagFileReader.java
    incubator/pig/branches/types/src/org/apache/pig/impl/io/DataBagFileWriter.java
    incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java
    incubator/pig/branches/types/src/org/apache/pig/impl/io/PigFile.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOEval.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LORead.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSplit.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOStore.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUnion.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/   (props changed)
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
    incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/MapReduceLauncher.java
    incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigCombine.java
    incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigInputFormat.java
    incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigMapReduce.java
    incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigSplit.java
    incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/SortPartitioner.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/IntermedResult.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/MapreducePlanCompiler.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POCogroup.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POEval.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POLoad.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POMapreduce.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PORead.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POSort.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POSplitMaster.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POSplitSlave.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POStore.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POUnion.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PhysicalOperator.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PhysicalPlan.java
    incubator/pig/branches/types/src/org/apache/pig/impl/util/DataBuffer.java
    incubator/pig/branches/types/src/org/apache/pig/impl/util/JarManager.java
    incubator/pig/branches/types/src/org/apache/pig/impl/util/PigLogger.java
    incubator/pig/branches/types/src/org/apache/pig/tools/grunt/   (props changed)
    incubator/pig/branches/types/test/   (props changed)
    incubator/pig/branches/types/test/org/apache/pig/test/TestAlgebraicEval.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestBuiltin.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestDataBag.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestDataModel.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestMapReduce.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestPigFile.java
    incubator/pig/branches/types/test/org/apache/pig/test/Util.java

Propchange: incubator/pig/branches/types/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Tue Jan 22 13:17:12 2008
@@ -0,0 +1,4 @@
+
+dist
+depend
+pig.jar

Modified: incubator/pig/branches/types/build.xml
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/build.xml?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/build.xml (original)
+++ incubator/pig/branches/types/build.xml Tue Jan 22 13:17:12 2008
@@ -5,11 +5,12 @@
 	<property name="lib.dir" value="${basedir}/lib" />
 	<property name="src.dir" value="${basedir}/src" />
 	<property name="doc.dir" value="${basedir}/doc" />
+        <property name="test.reports.dir" value="${basedir}/test/reports" />
 	<property name="shock.src.dir" value="${basedir}/lib-src/shock" />
 	<property name="bzip2.src.dir" value="${basedir}/lib-src/bzip2" />
 	<property name="test.src.dir" value="${basedir}/test" />
 	<property name="output.jarfile" value="pig.jar" />
-	<property name="hadoop.jarfile" value="hadoop14.jar"/>
+	<property name="hadoop.jarfile" value="hadoop15.jar"/>
 	<property name="ssh.gateway" value=""/>
 	<property name="hod.server" value=""/>
 	<property name="hod.command" value=""/>
@@ -26,6 +27,7 @@
 	<target name="clean">
 		<delete dir="${dist.dir}" />
 		<delete dir="${doc.dir}" />
+                <delete dir="${test.reports.dir}" />
 		<delete file="${output.jarfile}" />
 		<delete file="${src.dir}/org/apache/pig/impl/logicalLayer/parser/QueryParser.java" />
 		<delete file="${src.dir}/org/apache/pig/impl/logicalLayer/parser/JJTQueryParserState.java" />
@@ -39,24 +41,32 @@
 		<delete file="${src.dir}/org/apache/pig/impl/logicalLayer/parser/SimpleNode.java" />
 		<delete file="${src.dir}/org/apache/pig/impl/logicalLayer/parser/Token.java" />
 		<delete file="${src.dir}/org/apache/pig/impl/logicalLayer/parser/TokenMgrError.java" />
-		<delete file="${src.dir}/org/apache/pig/tools/grunt/GruntParser.java" />
-		<delete file="${src.dir}/org/apache/pig/tools/grunt/GruntParserConstants.java" />
-		<delete file="${src.dir}/org/apache/pig/tools/grunt/GruntParserTokenManager.java" />
-		<delete file="${src.dir}/org/apache/pig/tools/grunt/ParseException.java" />
-		<delete file="${src.dir}/org/apache/pig/tools/grunt/SimpleCharStream.java" />
-		<delete file="${src.dir}/org/apache/pig/tools/grunt/Token.java" />
-		<delete file="${src.dir}/org/apache/pig/tools/grunt/TokenMgrError.java" />
+		<delete file="${src.dir}/org/apache/pig/tools/pigscript/parser/PigScriptParser.java" />
+		<delete file="${src.dir}/org/apache/pig/tools/pigscript/parser/PigScriptParserConstants.java" />
+		<delete file="${src.dir}/org/apache/pig/tools/pigscript/parser/PigScriptParserTokenManager.java" />
+		<delete file="${src.dir}/org/apache/pig/tools/pigscript/parser/ParseException.java" />
+		<delete file="${src.dir}/org/apache/pig/tools/pigscript/parser/SimpleCharStream.java" />
+		<delete file="${src.dir}/org/apache/pig/tools/pigscript/parser/Token.java" />
+		<delete file="${src.dir}/org/apache/pig/tools/pigscript/parser/TokenMgrError.java" />
 	</target>
 
-	<target name="compile" depends="cc-compile">
+    <target name="depend">
+        <mkdir dir="depend"/>
+        <echo>*** Resolving dependencies ***</echo>
+        <depend srcdir="src;lib-src/shock;lib-src/bzip2" destdir="dist" cache="depend"/>
+    </target>
+
+	<target name="compile" depends="depend, cc-compile, lib-compile">
 		<mkdir dir="${dist.dir}" />
 		<echo>*** Building Main Sources ***</echo>
-		<javac srcdir="${src.dir};${shock.src.dir};${bzip2.src.dir}" destdir="${dist.dir}" target="1.5" debug="on">
+		<javac srcdir="${src.dir};${shock.src.dir};${bzip2.src.dir}" destdir="${dist.dir}" target="1.5" debug="on" deprecation="on">
 			<classpath refid="classpath" />
+			<!--<compilerarg value="-Xlint:unchecked"/> -->
 		</javac>
 		<echo>*** Building Test Sources ***</echo>
 		<javac srcdir="test" destdir="${dist.dir}" debug="on">
 			<classpath refid="classpath" />
+			<compilerarg value="-Xlint:unchecked"/>
 		</javac>
 	</target>
 
@@ -70,9 +80,17 @@
 			outputdirectory="${src.dir}/org/apache/pig/impl/logicalLayer/parser"
 			javacchome="${basedir}/lib" />
 		<javacc
-			target="${src.dir}/org/apache/pig/tools/grunt/GruntParser.jj" 
-			outputdirectory="${src.dir}/org/apache/pig/tools/grunt"
+			target="${src.dir}/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj" 
+			outputdirectory="${src.dir}/org/apache/pig/tools/pigscript/parser"
 			javacchome="${basedir}/lib" />
+	</target>
+
+        <target name="lib-compile">
+		<mkdir dir="${dist.dir}" />
+		<echo>*** Building Library Sources ***</echo>
+		<javac srcdir="${shock.src.dir};${bzip2.src.dir}" destdir="${dist.dir}" target="1.5" debug="on">
+			<classpath refid="classpath" />
+		</javac>
 	</target>
 
 	<target name="jar" depends="compile">

Modified: incubator/pig/branches/types/scripts/pig.pl
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/scripts/pig.pl?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/scripts/pig.pl (original)
+++ incubator/pig/branches/types/scripts/pig.pl Tue Jan 22 13:17:12 2008
@@ -19,7 +19,7 @@
 # Read our configuration file.  This will fill in values for pigJarRoot
 # and hodRoot.
 our $ROOT = (defined($ENV{'ROOT'}) ? $ENV{'ROOT'} : "/home/y");
-my ($pigJarRoot, $hodRoot);
+my ($pigJarRoot, $hodRoot, $defaultCluster);
 
 open(CFG, "< $ROOT/conf/pigclient.conf") or
 	die "Can't open $ROOT/conf/pigclient.conf, $ERRNO\n";
@@ -67,6 +67,16 @@
 
 if (defined $classpath)
 {
+    # Check to make sure that the jar file specified in the class path is
+    # available.
+    $classpath =~ /(^|:)([^:]*pig.jar)($|:)/;
+    my $jar = $2;
+    if (!(-e $jar)) {
+        die "I can't find the jar file $jar.  If you explicitly
+put this jar in your classpath, please check that you have the path name
+correct.  If you specified a cluster via -c[luster], then the pig jar for
+that cluster is not present on this machine.\n";
+    }
 	push (@javaArgs, "-cp", $classpath);
 }
 
@@ -89,10 +99,10 @@
 
 # If we aren't attaching to kryptonite, set up the right hod config file.
 if ($cluster ne "kryptonite") {
-	my $hodCfg = "$hodRoot/conf/$cluster";
-	if (-e $hodCfg) {
-		$hodParam .= "--config=$hodCfg";
-	} else {
+    # With splitting of gateways, HOD file is always hodrc, no matter what
+    # cluster you're talking to.
+	my $hodCfg = "$hodRoot/conf/hodrc";
+	if (! (-e $hodCfg)) {
 		push(@cmd, "-Dhod.server=");
 		warn "I can't find HOD configuration for $cluster, hopefully you weren't planning on using HOD.\n";
 	}
@@ -118,8 +128,9 @@
 	# first, figure out if we are working with a deployed cluster 
 	if (!(defined $cluster) && (!(defined $classpath) || !($classpath =~/pig.jar/)))
 	{
-		# we are using default cluster
-		$cluster = 'kryptonite';
+		# we are using default cluster, the name of which is stored in the
+        # pigclient.conf file.
+		$cluster = $defaultCluster;
 	}
 
 	# we are running from a cluster

Modified: incubator/pig/branches/types/src/org/apache/pig/EvalFunc.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/EvalFunc.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/EvalFunc.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/EvalFunc.java Tue Jan 22 13:17:12 2008
@@ -21,7 +21,6 @@
 import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
 
-import org.apache.pig.data.Datum;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
@@ -43,7 +42,7 @@
  * @author database-systems@yahoo.research
  *
  */
-public abstract class EvalFunc<T extends Datum>  {
+public abstract class EvalFunc<T>  {
 	
 	protected Type returnType;
 	
@@ -70,9 +69,11 @@
 		
 		returnType = parameters[0];
 		
+        /*
 		if (returnType == Datum.class){
 			throw new RuntimeException("Eval function must return a specific type of Datum");
 		}
+        */
 		
 		
 		//Type check the initial, intermediate, and final functions
@@ -135,9 +136,10 @@
      * invocations of this method.
      * 
      * @param input the Tuple to be processed.
+     * @return result, of type T.
      * @throws IOException
      */
-    abstract public void exec(Tuple input, T output) throws IOException;
+    abstract public T exec(Tuple input) throws IOException;
     
     /**
      * @param input Schema of the input

Modified: incubator/pig/branches/types/src/org/apache/pig/Main.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/Main.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/Main.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/Main.java Tue Jan 22 13:17:12 2008
@@ -25,10 +25,12 @@
 
 import org.apache.hadoop.util.HadoopExe;
 
+import org.apache.log4j.BasicConfigurator;
 import org.apache.log4j.Logger;
 import org.apache.log4j.Level;
 import org.apache.log4j.ConsoleAppender;
 import org.apache.log4j.PatternLayout;
+import org.apache.log4j.PropertyConfigurator;
 import org.apache.pig.PigServer.ExecType;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.util.PigLogger;
@@ -65,11 +67,15 @@
 		int port = 0;
 		String file = null;
 		Level logLevel = Level.INFO;
+        boolean brief = false;
+        String log4jconf = null;
 		boolean verbose = false;
 
 		CmdLineParser opts = new CmdLineParser(args);
 		// Don't use -l, --latest, -c, --cluster, -cp, -classpath, -D as these
 		// are masked by the startup perl script.
+        opts.registerOpt('4', "log4jconf", CmdLineParser.ValueExpected.REQUIRED);
+        opts.registerOpt('b', "brief", CmdLineParser.ValueExpected.NOT_ACCEPTED);
 		opts.registerOpt('c', "cluster", CmdLineParser.ValueExpected.REQUIRED);
 		opts.registerOpt('d', "debug", CmdLineParser.ValueExpected.REQUIRED);
 		opts.registerOpt('e', "execute", CmdLineParser.ValueExpected.NOT_ACCEPTED);
@@ -83,6 +89,14 @@
 		char opt;
 		while ((opt = opts.getNextOpt()) != CmdLineParser.EndOfOpts) {
 			switch (opt) {
+            case '4':
+                log4jconf = opts.getValStr();
+                break;
+
+            case 'b':
+                brief = true;
+                break;
+
 			case 'c': {
 				// Needed away to specify the cluster to run the MR job on
 				// Bug 831708 - fixed
@@ -155,14 +169,35 @@
 
 		LogicalPlanBuilder.classloader = pigContext.createCl(null);
 
-		// Set the log level, and set up appenders
-		Logger log = PigLogger.getLogger();
-		log.setLevel(logLevel);
-		ConsoleAppender screen = new ConsoleAppender(new PatternLayout());
-		if (verbose) screen.setThreshold(logLevel);
-		else screen.setThreshold(Level.INFO);
-		screen.setTarget(ConsoleAppender.SYSTEM_ERR);
-		log.addAppender(screen);
+		if (log4jconf != null) {
+            PropertyConfigurator.configure(log4jconf);
+		} else if (!brief) {
+		    // non-brief logging - timestamps
+		    Properties props = new Properties();
+		    props.setProperty("log4j.rootLogger", "INFO, PIGCONSOLE");
+            props.setProperty("log4j.appender.PIGCONSOLE",
+                              "org.apache.log4j.ConsoleAppender");
+            props.setProperty("log4j.appender.PIGCONSOLE.layout",
+                              "org.apache.log4j.PatternLayout");
+            props.setProperty("log4j.appender.PIGCONSOLE.layout.ConversionPattern",
+                              "%d [%t] %-5p %c - %m%n");
+    	    PropertyConfigurator.configure(props);
+            // Set the log level/threshold
+            Logger.getRootLogger().setLevel(verbose ? Level.ALL : logLevel);
+		} else {
+		    // brief logging - no timestamps
+            Properties props = new Properties();
+            props.setProperty("log4j.rootLogger", "INFO, PIGCONSOLE");
+            props.setProperty("log4j.appender.PIGCONSOLE",
+                              "org.apache.log4j.ConsoleAppender");
+            props.setProperty("log4j.appender.PIGCONSOLE.layout",
+                              "org.apache.log4j.PatternLayout");
+            props.setProperty("log4j.appender.PIGCONSOLE.layout.ConversionPattern",
+                              "%m%n");
+            PropertyConfigurator.configure(props);
+            // Set the log level/threshold
+            Logger.getRootLogger().setLevel(verbose ? Level.ALL : logLevel);
+		}
 
 		// TODO Add a file appender for the logs
 		// TODO Need to create a property in the properties file for it.
@@ -247,6 +282,8 @@
 	System.err.println("       Pig [options] -e[xecute] cmd [cmd ...] : Run cmd(s).");
 	System.err.println("       Pig [options] [-f[ile]] file : Run cmds found in file.");
 	System.err.println("  options include:");
+    System.err.println("    -4, -log4jconf log4j configuration file, overrides log conf");
+    System.err.println("    -b, -brief brief logging (no timestamps)");
 	System.err.println("    -c, -cluster clustername, kryptonite is default");
 	System.err.println("    -d, -debug debug level, INFO is default");
 	System.err.println("    -h, -help display this message");

Modified: incubator/pig/branches/types/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/PigServer.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/PigServer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/PigServer.java Tue Jan 22 13:17:12 2008
@@ -20,16 +20,22 @@
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.PrintStream;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Enumeration;
+import java.util.Vector;
+import java.net.URL;
+import java.net.URI;
+import java.net.URISyntaxException;
 
 import org.apache.hadoop.dfs.DistributedFileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.data.Datum;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.BufferedPositionedInputStream;
 import org.apache.pig.impl.io.FileLocalizer;
@@ -39,6 +45,8 @@
 import org.apache.pig.impl.logicalLayer.LOStore;
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.logicalLayer.LOVisitor;
+import org.apache.pig.impl.logicalLayer.LOPrinter;
 import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
 import org.apache.pig.impl.logicalLayer.parser.ParseException;
 import org.apache.pig.impl.logicalLayer.schema.TupleSchema;
@@ -47,6 +55,9 @@
 import org.apache.pig.impl.physicalLayer.POMapreduce;
 import org.apache.pig.impl.physicalLayer.POStore;
 import org.apache.pig.impl.physicalLayer.PhysicalPlan;
+import org.apache.pig.impl.physicalLayer.POVisitor;
+import org.apache.pig.impl.physicalLayer.POPrinter;
+import org.apache.pig.impl.util.PigLogger;
 
 
 
@@ -146,11 +157,59 @@
     	pigContext.registerFunction(function, functionSpec);
     }
     
-    public void registerJar(String path) throws IOException{
-    	File f = new File(path);
-    	if (!f.canRead())
-    		throw new IOException("Can't read " + path);
-        pigContext.addJar(path);
+    private URL locateJarFromResources(String jarName) throws IOException {
+        Enumeration<URL> urls = ClassLoader.getSystemResources(jarName);
+        URL resourceLocation = null;
+        
+        if (urls.hasMoreElements()) {
+        	resourceLocation = urls.nextElement();
+        }
+        
+        if (pigContext.debug && urls.hasMoreElements()) {
+            String logMessage = "Found multiple resources that match " 
+                + jarName + ": " + resourceLocation;
+            
+            while (urls.hasMoreElements()) {
+            	logMessage += (logMessage + urls.nextElement() + "; ");
+            }
+            
+            PigLogger.getLogger().debug(logMessage);
+        }
+    
+        return resourceLocation;
+    }
+    
+    /**
+     * Registers a jar file. Name of the jar file can be an absolute or 
+     * relative path.
+     * 
+     * If multiple resources are found with the specified name, the
+     * first one is registered as returned by getSystemResources.
+     * A warning is issued to inform the user.
+     * 
+     * @param name of the jar file to register
+     * @throws IOException
+     */
+    public void registerJar(String name) throws IOException {
+        // first try to locate jar via system resources
+        // if this fails, try by using "name" as File (this preserves 
+        // compatibility with case when user passes absolute path or path 
+    	// relative to current working directory.)    	
+        if (name != null) {
+            URL resource = locateJarFromResources(name);
+
+            if (resource == null) {
+                File f = new File(name);
+                
+                if (!f.canRead()) {
+                    throw new IOException("Can't read jar file: " + name);
+                }
+                
+                resource = f.toURI().toURL();
+            }
+
+            pigContext.addJar(resource);    	
+        }
     }
     
     /**
@@ -244,12 +303,7 @@
        		pp = physicalPlans.get(readFrom);
     	}
     	
-		// Data bags are guaranteed to contain tuples.
-    	//return pp.exec(continueFromLast).content();
-		// A direct subversion of the type system, this has to be bad.
-		Iterator<Datum> i = pp.exec(continueFromLast).content();
-		Object o = i;
-    	return (Iterator<Tuple>)o;
+    	return pp.exec(continueFromLast).iterator();
     	
     }
     
@@ -265,10 +319,8 @@
 
         readFrom.compile(queryResults);
         readFrom.exec();
-        if (pigContext.getExecType() == ExecType.LOCAL) {
-            Object o = readFrom.read().content();
-			return (Iterator<Tuple>)o;
-		}
+        if (pigContext.getExecType() == ExecType.LOCAL)
+            return readFrom.read().iterator();
         final LoadFunc p;
         
         try{
@@ -470,6 +522,35 @@
 	}
 
     /**
+     * Provide information on how a pig query will be executed.  For now
+     * this information is very developer focussed, and probably not very
+     * useful to the average user.
+     * @param alias Name of alias to explain.
+     * @param stream PrintStream to write explanation to.
+     * @throws IOException if the requested alias cannot be found.
+     */
+    public void explain(
+            String alias,
+            PrintStream stream) throws IOException {
+        stream.println("Logical Plan:");
+        IntermedResult ir = queryResults.get(alias);
+        if (ir == null) {
+            PigLogger.getLogger().error("Invalid alias: " + alias);
+            throw new IOException("Invalid alias: " + alias);
+        }
+
+        LOVisitor lprinter = new LOPrinter(stream);
+        ir.lp.getRoot().visit(lprinter);
+
+        stream.println("-----------------------------------------------");
+        stream.println("Physical Plan:");
+        // have to first compile the plan
+        ir.compile(queryResults);
+        POVisitor pprinter = new POPrinter(stream);
+        ir.pp.root.visit(pprinter);
+    }
+
+    /**
      * Returns the unused byte capacity of an HDFS filesystem. This value does
      * not take into account a replication factor, as that can vary from file
      * to file. Thus if you are using this to determine if you data set will fit
@@ -496,7 +577,7 @@
     public long fileSize(String filename) throws IOException {
         FileSystem dfs = pigContext.getDfs();
         Path p = new Path(filename);
-        long len = dfs.getLength(p);
+        long len = dfs.getFileStatus(p).getLen();
         long replication = dfs.getDefaultReplication(); // did not work, for some reason: dfs.getReplication(p);
         return len * replication;
     }
@@ -518,10 +599,10 @@
     }
     
     public String[] listPaths(String dir) throws IOException {
-        Path paths[] = pigContext.getDfs().listPaths(new Path(dir));
-        String strPaths[] = new String[paths.length];
-        for (int i = 0; i < paths.length; i++) {
-            strPaths[i] = paths[i].toString();
+        FileStatus stats[] = pigContext.getDfs().listStatus(new Path(dir));
+        String strPaths[] = new String[stats.length];
+        for (int i = 0; i < stats.length; i++) {
+            strPaths[i] = stats[i].getPath().toString();
         }
         return strPaths;
     }

Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/ARITY.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/ARITY.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/ARITY.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/ARITY.java Tue Jan 22 13:17:12 2008
@@ -20,17 +20,17 @@
 import java.io.IOException;
 
 import org.apache.pig.EvalFunc;
-import org.apache.pig.data.DataAtom;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.logicalLayer.schema.AtomSchema;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 
-public class ARITY extends EvalFunc<DataAtom> {
+public class ARITY extends EvalFunc<Integer> {
 
     @Override
-    public void exec(Tuple input, DataAtom output) throws IOException {
-        output.setValue(input.arity());
+    public Integer exec(Tuple input) throws IOException {
+        return new Integer(input.size());
     }
+
     @Override
     public Schema outputSchema(Schema input) {
         return new AtomSchema("arity");

Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/AVG.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/AVG.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/AVG.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/AVG.java Tue Jan 22 13:17:12 2008
@@ -22,9 +22,9 @@
 
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
-import org.apache.pig.data.DataAtom;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.logicalLayer.schema.AtomSchema;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 
@@ -33,10 +33,12 @@
  * Generates the average of the values of the first field of a tuple. This class is Algebraic in
  * implemenation, so if possible the execution will be split into a local and global application
  */
-public class AVG extends EvalFunc<DataAtom> implements Algebraic {
+public class AVG extends EvalFunc<Double> implements Algebraic {
+    
+    private static TupleFactory mTupleFactory = TupleFactory.getInstance();
 
     @Override
-	public void exec(Tuple input, DataAtom output) throws IOException {
+    public Double exec(Tuple input) throws IOException {
         double sum = sum(input);
         double count = count(input);
 
@@ -44,7 +46,7 @@
         if (count > 0)
             avg = sum / count;
 
-        output.setValue(avg);
+        return new Double(avg);
     }
 
     public String getInitial() {
@@ -61,79 +63,72 @@
 
     static public class Initial extends EvalFunc<Tuple> {
         @Override
-		public void exec(Tuple input, Tuple output) throws IOException {
-        	try {
-            output.appendField(new DataAtom(sum(input)));
-            output.appendField(new DataAtom(count(input)));
-            output.appendField(new DataAtom("processed by initial"));
-        	} catch(RuntimeException t) {
-        		throw new RuntimeException(t.getMessage() + ": " + input);
-        	}
+        public Tuple exec(Tuple input) throws IOException {
+            try {
+                Tuple t = mTupleFactory.newTuple(2);
+                t.set(0, sum(input));
+                t.set(1, count(input));
+                return t;
+            } catch(RuntimeException t) {
+                throw new RuntimeException(t.getMessage() + ": " + input);
+            }
         }
     }
 
     static public class Intermed extends EvalFunc<Tuple> {
         @Override
-		public void exec(Tuple input, Tuple output) throws IOException {
-            combine(input.getBagField(0), output);
+        public Tuple exec(Tuple input) throws IOException {
+            DataBag b = (DataBag)input.get(0);
+            return combine(b);
         }
     }
 
-    static public class Final extends EvalFunc<DataAtom> {
+    static public class Final extends EvalFunc<Double> {
         @Override
-		public void exec(Tuple input, DataAtom output) throws IOException {
-            Tuple combined = new Tuple();
-            if(input.getField(0) instanceof DataBag) {
-                combine(input.getBagField(0), combined);    
-            } else {
-                throw new RuntimeException("Bag not found in: " + input);
-                
-                
-                //combined = input.getTupleField(0);
-            }
-            double sum = combined.getAtomField(0).numval();
-            double count = combined.getAtomField(1).numval();
+        public Double exec(Tuple input) throws IOException {
+            DataBag b = (DataBag)input.get(0);
+            Tuple combined = combine(b);
+
+            double sum = (Double)combined.get(0);
+            double count = (Long)combined.get(1);
 
             double avg = 0;
             if (count > 0) {
                 avg = sum / count;
             }
-            output.setValue(avg);
+            return new Double(avg);
         }
     }
 
-    static protected void combine(DataBag values, Tuple output) throws IOException {
+    static protected Tuple combine(DataBag values) throws IOException {
         double sum = 0;
-        double count = 0;
+        long count = 0;
 
-        for (Iterator it = values.content(); it.hasNext();) {
-            Tuple t = (Tuple) it.next();
-//            if(!(t.getField(0) instanceof DataAtom)) {
-//                throw new RuntimeException("Unexpected Type: " + t.getField(0).getClass().getName() + " in " + t);
-//            }
-            
-            sum += t.getAtomField(0).numval();
-            count += t.getAtomField(1).numval();
+        Tuple output = mTupleFactory.newTuple(2);
+
+        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
+            Tuple t = it.next();
+            sum += (Double)t.get(0);
+            count += (Long)t.get(1);
         }
 
-        output.appendField(new DataAtom(sum));
-        output.appendField(new DataAtom(count));
+        output.set(0, new Double(sum));
+        output.set(1, new Long(count));
+        return output;
     }
 
     static protected long count(Tuple input) throws IOException {
-        DataBag values = input.getBagField(0);
-
-        
-        return values.cardinality();
+        DataBag values = (DataBag)input.get(0);
+        return values.size();
     }
 
     static protected double sum(Tuple input) throws IOException {
-        DataBag values = input.getBagField(0);
+        DataBag values = (DataBag)input.get(0);
 
         double sum = 0;
-        for (Iterator it = values.content(); it.hasNext();) {
-            Tuple t = (Tuple) it.next();
-            sum += t.getAtomField(0).numval();
+        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
+            Tuple t = it.next();
+            sum += (Double)t.get(0);
         }
 
         return sum;

Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/BinStorage.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/BinStorage.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/BinStorage.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/BinStorage.java Tue Jan 22 13:17:12 2008
@@ -26,11 +26,16 @@
 
 import org.apache.pig.LoadFunc;
 import org.apache.pig.StoreFunc;
+import org.apache.pig.data.DataReaderWriter;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.io.BufferedPositionedInputStream;
 
 
 public class BinStorage implements LoadFunc, StoreFunc {
+    public static final byte RECORD_1 = 0x21;
+    public static final byte RECORD_2 = 0x31;
+    public static final byte RECORD_3 = 0x41;
+
     Iterator<Tuple>     i              = null;
     protected BufferedPositionedInputStream in = null;
     private DataInputStream inData = null;
@@ -51,25 +56,23 @@
                 return null;
             }
             b = (byte) in.read();
-            if(b != Tuple.RECORD_1 && b != -1) {
+            if(b != RECORD_1 && b != -1) {
                 continue;
             }
             if(b == -1) return null;
             b = (byte) in.read();
-            if(b != Tuple.RECORD_2 && b != -1) {
+            if(b != RECORD_2 && b != -1) {
                 continue;
             }
             if(b == -1) return null;
             b = (byte) in.read();
-            if(b != Tuple.RECORD_3 && b != -1) {
+            if(b != RECORD_3 && b != -1) {
                 continue;
             }
             if(b == -1) return null;
             break;
         }
-        Tuple t = new Tuple();
-        t.readFields(inData);
-        return t;
+        return (Tuple)DataReaderWriter.readDatum(inData);
     }
 
 	public void bindTo(String fileName, BufferedPositionedInputStream in, long offset, long end) throws IOException {
@@ -90,9 +93,9 @@
     }
 
     public void putNext(Tuple t) throws IOException {
-        out.write(Tuple.RECORD_1);
-        out.write(Tuple.RECORD_2);
-        out.write(Tuple.RECORD_3);
+        out.write(RECORD_1);
+        out.write(RECORD_2);
+        out.write(RECORD_3);
         t.write(out);
     }
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/COUNT.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/COUNT.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/COUNT.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/COUNT.java Tue Jan 22 13:17:12 2008
@@ -19,14 +19,14 @@
 
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.Map;
 
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
-import org.apache.pig.data.DataAtom;
 import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataMap;
-import org.apache.pig.data.Datum;
+import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.logicalLayer.schema.AtomSchema;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 
@@ -34,11 +34,11 @@
  * Generates the count of the values of the first field of a tuple. This class is Algebraic in
  * implemenation, so if possible the execution will be split into a local and global functions
  */
-public class COUNT extends EvalFunc<DataAtom> implements Algebraic{
+public class COUNT extends EvalFunc<Long> implements Algebraic{
 
     @Override
-	public void exec(Tuple input, DataAtom output) throws IOException {
-        output.setValue(count(input));
+    public Long exec(Tuple input) throws IOException {
+        return count(input);
     }
 
     public String getInitial() {
@@ -54,43 +54,50 @@
     }
 
     static public class Initial extends EvalFunc<Tuple> {
+        TupleFactory tfact = TupleFactory.getInstance();
+
         @Override
-		public void exec(Tuple input, Tuple output) throws IOException {
-            output.appendField(new DataAtom(count(input)));
+        public Tuple exec(Tuple input) throws IOException {
+            return tfact.newTuple(count(input));
         }
     }
 
     static public class Intermed extends EvalFunc<Tuple> {
+        TupleFactory tfact = TupleFactory.getInstance();
+
         @Override
-		public void exec(Tuple input, Tuple output) throws IOException {
-            output.appendField(new DataAtom(sum(input)));
+        public Tuple exec(Tuple input) throws IOException {
+            return tfact.newTuple(count(input));
         }
     }
 
-    static public class Final extends EvalFunc<DataAtom> {
+    static public class Final extends EvalFunc<Long> {
         @Override
-		public void exec(Tuple input, DataAtom output) throws IOException {
-            output.setValue(sum(input));
+        public Long exec(Tuple input) throws IOException {
+            return sum(input);
         }
     }
 
-    static protected long count(Tuple input) throws IOException {
-        Datum values = input.getField(0);        
+    static protected Long count(Tuple input) throws IOException {
+        Object values = input.get(0);        
         if (values instanceof DataBag)
-        	return ((DataBag)values).cardinality();
-        else if (values instanceof DataMap)
-        	return ((DataMap)values).cardinality();
+            return ((DataBag)values).size();
+        else if (values instanceof Map)
+            return new Long(((Map)values).size());
         else
-        	throw new IOException("Cannot count a " + values.getClass().getSimpleName());
+            throw new IOException("Cannot count a " +
+                DataType.findTypeName(values));
     }
 
-    static protected double sum(Tuple input) throws IOException {
-        DataBag values = input.getBagField(0);
-        double sum = 0;
-        for (Iterator<Datum> it = values.content(); it.hasNext();) {
-            Tuple t = (Tuple)it.next();
+    static protected Long sum(Tuple input) throws IOException {
+        DataBag values = (DataBag)input.get(0);
+        long sum = 0;
+        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
+            Tuple t = it.next();
             try {
-                sum += t.getAtomField(0).numval();
+                // Have faith here.  Checking each value before the cast is
+                // just too much.
+                sum += (Long)t.get(0);
             } catch (NumberFormatException exp) {
                 throw new IOException(exp.getClass().getName() + ":" + exp.getMessage());
             }
@@ -98,7 +105,7 @@
         return sum;
     }
 
-@Override
+    @Override
     public Schema outputSchema(Schema input) {
         return new AtomSchema("count" + count++);
     }

Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/DIFF.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/DIFF.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/DIFF.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/DIFF.java Tue Jan 22 13:17:12 2008
@@ -21,11 +21,10 @@
 import java.util.Iterator;
 
 import org.apache.pig.EvalFunc;
-import org.apache.pig.data.DataAtom;
+import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.data.Datum;
-import org.apache.pig.data.AtomicDatum;
+import org.apache.pig.data.TupleFactory;
 
 
 /**
@@ -37,46 +36,82 @@
  *
  */
 public class DIFF extends EvalFunc<DataBag> {
+    TupleFactory mTupleFactory = TupleFactory.getInstance();
+    BagFactory mBagFactory = BagFactory.getInstance();
+
     /**
      * Compares a tuple with two fields. Emits any differences.
      * @param input a tuple with exactly two fields.
      * @throws IOException if there are not exactly two fields in a tuple
      */
     @Override
-    public void exec(Tuple input, DataBag output) throws IOException {
-        if (input.arity() != 2) {
-            throw new IOException("DIFF must compare two fields not " + input.arity());
+    public DataBag exec(Tuple input) throws IOException {
+        if (input.size() != 2) {
+            throw new IOException("DIFF must compare two fields not " +
+                input.size());
         }
-		if (input.getField(0).getType() == Datum.DataType.BAG) {
-            DataBag field1 = input.getBagField(0);
-            DataBag field2 = input.getBagField(1);
-            Iterator<Datum> it1 = field1.content();
-            checkInBag(field2, it1, output);
-            Iterator<Datum> it2 = field2.content();
-            checkInBag(field1, it2, output);
+        DataBag output = mBagFactory.newDefaultBag();
+        Object o1 = input.get(0);
+        if (o1 instanceof DataBag) {
+            DataBag bag1 = (DataBag)o1;
+            DataBag bag2 = (DataBag)input.get(1);
+            computeDiff(bag1, bag2, output);
         } else {
-            AtomicDatum d1 = input.getAtomField(0);
-            AtomicDatum d2 = input.getAtomField(1);
+            Object d1 = input.get(0);
+            Object d2 = input.get(1);
             if (!d1.equals(d2)) {
-                output.add(new Tuple(d1));
-                output.add(new Tuple(d2));
+                output.add(mTupleFactory.newTuple(d1));
+                output.add(mTupleFactory.newTuple(d2));
             }
         }
+        return output;
     }
 
-    private void checkInBag(DataBag bag, Iterator<Datum> iterator, DataBag emitTo) throws IOException {
-        while(iterator.hasNext()) {
-            Datum t = iterator.next();
-            Iterator<Datum> it2 = bag.content();
-            boolean found = false;
-            while(it2.hasNext()) {
-                if (t.equals(it2.next())) {
-                    found = true;
-                }
-            }
-            if (!found) {
-                emitTo.add(t);
+    private void computeDiff(
+            DataBag bag1,
+            DataBag bag2,
+            DataBag emitTo) throws IOException {
+        // Create two distinct versions of the bag.  This will speed up
+        // comparison, and provide us a sorted order so we don't have to do
+        // an n^2 lookup.
+        DataBag d1 = mBagFactory.newDistinctBag();
+        DataBag d2 = mBagFactory.newDistinctBag();
+        Iterator<Tuple> i1 = d1.iterator();
+        Iterator<Tuple> i2 = d2.iterator();
+        while (i1.hasNext()) d1.add(i1.next());
+        while (i2.hasNext()) d2.add(i2.next());
+
+        i1 = d1.iterator();
+        i2 = d2.iterator();
+
+        Tuple t1 = i1.next();
+        Tuple t2 = i2.next();
+
+        while (i1.hasNext() && i2.hasNext()) {
+            int c = t1.compareTo(t2);
+
+            if (c < 0) {
+                // place t1 in the result bag and advance i1
+                emitTo.add(t1);
+                t1 = i1.next();
+            } else if (c > 0) {
+                // place t2 in the result bag and advance i2
+                emitTo.add(t2);
+                t2 = i2.next();
+            } else if (c == 0) {
+                // put neither in the result bag, advance both iterators
+                t1 = i1.next();
+                t2 = i2.next();
             }
+        }
+
+        // One ran out, put all the rest of the other (if there are any) in
+        // the result bag.
+        while (i1.hasNext()) {
+            emitTo.add(i1.next());
+        }
+        while (i2.hasNext()) {
+            emitTo.add(i2.next());
         }
     }
     

Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/IsEmpty.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/IsEmpty.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/IsEmpty.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/IsEmpty.java Tue Jan 22 13:17:12 2008
@@ -18,25 +18,26 @@
 package org.apache.pig.builtin;
 
 import java.io.IOException;
+import java.util.Map;
 
 import org.apache.pig.FilterFunc;
 import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataMap;
-import org.apache.pig.data.Datum;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.DataType;
 
 
 public class IsEmpty extends FilterFunc {
 
     @Override
     public boolean exec(Tuple input) throws IOException {
-    	Datum values = input.getField(0);        
+        Object values = input.get(0);        
         if (values instanceof DataBag)
-        	return ((DataBag)values).cardinality() == 0;
-        else if (values instanceof DataMap)
-        	return ((DataMap)values).cardinality() == 0;
+            return ((DataBag)values).size() == 0;
+        else if (values instanceof Map)
+            return ((Map)values).size() == 0;
         else
-        	throw new IOException("Cannot test a " + values.getClass().getSimpleName() + " for emptiness.");
+            throw new IOException("Cannot test a " +
+                DataType.findTypeName(values) + " for emptiness.");
     }
 
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/MAX.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/MAX.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/MAX.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/MAX.java Tue Jan 22 13:17:12 2008
@@ -22,9 +22,9 @@
 
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
-import org.apache.pig.data.DataAtom;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.logicalLayer.schema.AtomSchema;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 
@@ -32,58 +32,67 @@
 /**
  * Generates the max of the values of the first field of a tuple.
  */
-public class MAX extends EvalFunc<DataAtom> implements Algebraic {
+public class MAX extends EvalFunc<Double> implements Algebraic {
 
-	@Override
-	public void exec(Tuple input, DataAtom output) throws IOException {
-		output.setValue(max(input));
-	}
-
-	public String getInitial() {
-		return Initial.class.getName();
-	}
-
-	public String getIntermed() {
-		return Initial.class.getName();
-	}
-
-	public String getFinal() {
-		return Final.class.getName();
-	}
-
-	static public class Initial extends EvalFunc<Tuple> {
-		@Override
-		public void exec(Tuple input, Tuple output) throws IOException {
-			output.appendField(new DataAtom(max(input)));
-		}
-	}
-	static public class Final extends EvalFunc<DataAtom> {
-		@Override
-		public void exec(Tuple input, DataAtom output) throws IOException {
-			output.setValue(max(input));
-		}
-	}
-
-	static protected double max(Tuple input) throws IOException {
-		DataBag values = input.getBagField(0);
-
-		double curMax = Double.NEGATIVE_INFINITY;
-		for (Iterator it = values.content(); it.hasNext();) {
-			Tuple t = (Tuple) it.next();
-			try {
-				curMax = java.lang.Math.max(curMax, t.getAtomField(0).numval());
-			}catch(RuntimeException exp) {
-				throw new IOException("Error processing: " + t.toString(), exp);
-
-			}
-		}
-
-		return curMax;
-	}
-	@Override
-	public Schema outputSchema(Schema input) {
-		return new AtomSchema("max" + count++);
-	}
+    @Override
+    public Double exec(Tuple input) throws IOException {
+         return max(input);
+    }
+
+    public String getInitial() {
+        return Initial.class.getName();
+    }
+
+    public String getIntermed() {
+        return Initial.class.getName();
+    }
+
+    public String getFinal() {
+        return Final.class.getName();
+    }
+
+    static public class Initial extends EvalFunc<Tuple> {
+        TupleFactory tfact = TupleFactory.getInstance();
+
+        @Override
+        public Tuple exec(Tuple input) throws IOException {
+            return tfact.newTuple(max(input));
+        }
+    }
+    static public class Final extends EvalFunc<Double> {
+        @Override
+        public Double exec(Tuple input) throws IOException {
+            return max(input);
+        }
+    }
+
+    static protected Double max(Tuple input) throws IOException {
+        Object o = input.get(0);
+        if (!(o instanceof DataBag)) {
+            throw new IOException("Input to sum function should be a bag");
+        }
+        DataBag values = (DataBag)o;
+
+        double curMax = Double.NEGATIVE_INFINITY;
+        for (Iterator it = values.iterator(); it.hasNext();) {
+            Tuple t = (Tuple)it.next();
+            try {
+                curMax = java.lang.Math.max(curMax, (Double)t.get(0));
+            } catch (RuntimeException exp) {
+                IOException newE = new IOException("Error processing: " +
+                    t.toString() + exp.getMessage());
+                newE.initCause(exp);
+                throw newE;
+            }
+        }
+
+        return curMax;
+    }
+
+    @Override
+    public Schema outputSchema(Schema input) {
+        return new AtomSchema("max" + count++);
+    }
 
-	private static int count = 1;
+    private static int count = 1;
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/MIN.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/MIN.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/MIN.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/MIN.java Tue Jan 22 13:17:12 2008
@@ -22,9 +22,9 @@
 
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
-import org.apache.pig.data.DataAtom;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.logicalLayer.schema.AtomSchema;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 
@@ -32,57 +32,63 @@
 /**
  * Generates the min of the values of the first field of a tuple.
  */
-public class MIN extends EvalFunc<DataAtom> implements Algebraic {
+public class MIN extends EvalFunc<Double> implements Algebraic {
 
-	@Override
-	public void exec(Tuple input, DataAtom output) throws IOException {
-		output.setValue(min(input));
-	}
-
-	public String getInitial() {
-		return Initial.class.getName();
-	}
-
-	public String getIntermed() {
-		return Initial.class.getName();
-	}
-
-	public String getFinal() {
-		return Final.class.getName();
-	}
-
-	static public class Initial extends EvalFunc<Tuple> {
-		@Override
-		public void exec(Tuple input, Tuple output) throws IOException {
-			output.appendField(new DataAtom(min(input)));
-		}
-	}
-	static public class Final extends EvalFunc<DataAtom> {
-		@Override
-		public void exec(Tuple input, DataAtom output) throws IOException {
-			output.setValue(min(input));
-		}
-	}
-
-	static protected double min(Tuple input) throws IOException {
-		DataBag values = input.getBagField(0);
-
-		double curMin = Double.POSITIVE_INFINITY;
-		for (Iterator it = values.content(); it.hasNext();) {
-			Tuple t = (Tuple) it.next();
-			try {
-				curMin = java.lang.Math.min(curMin, t.getAtomField(0).numval());
-			}catch(RuntimeException exp) {
-				throw new IOException("Error processing: " + t.toString(), exp);
-		}
-	}
-
-	return curMin;
-}
-	@Override
-	public Schema outputSchema(Schema input) {
-		return new AtomSchema("min" + count++);
-	}
+    @Override
+    public Double exec(Tuple input) throws IOException {
+        return min(input);
+    }
+
+    public String getInitial() {
+        return Initial.class.getName();
+    }
+
+    public String getIntermed() {
+        return Initial.class.getName();
+    }
+
+    public String getFinal() {
+        return Final.class.getName();
+    }
+
+    static public class Initial extends EvalFunc<Tuple> {
+        TupleFactory tfact = TupleFactory.getInstance();
+
+        @Override
+        public Tuple exec(Tuple input) throws IOException {
+            return tfact.newTuple(min(input));
+        }
+    }
+    static public class Final extends EvalFunc<Double> {
+        @Override
+        public Double exec(Tuple input) throws IOException {
+            return min(input);
+        }
+    }
+
+    static protected Double min(Tuple input) throws IOException {
+        DataBag values = (DataBag)input.get(0);
+
+        double curMin = Double.POSITIVE_INFINITY;
+        for (Iterator it = values.iterator(); it.hasNext();) {
+            Tuple t = (Tuple) it.next();
+            try {
+                curMin = java.lang.Math.min(curMin, (Double)t.get(0));
+            } catch (RuntimeException exp) {
+                IOException newE =  new IOException("Error processing: " +
+                    t.toString() + exp.getMessage());
+                newE.initCause(exp);
+                throw newE;
+            }
+        }
+    
+        return new Double(curMin);
+    }
+
+    @Override
+    public Schema outputSchema(Schema input) {
+        return new AtomSchema("min" + count++);
+    }
 
-	private static int count = 1;
+    private static int count = 1;
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java Tue Jan 22 13:17:12 2008
@@ -18,18 +18,22 @@
 package org.apache.pig.builtin;
 
 import java.io.BufferedReader;
+import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.text.SimpleDateFormat;
+import java.util.ArrayList;
 import java.util.Iterator;
 
 import org.apache.pig.LoadFunc;
 import org.apache.pig.StoreFunc;
-import org.apache.pig.data.TimestampedTuple;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.io.BufferedPositionedInputStream;
 
 
@@ -40,11 +44,13 @@
  */
 public class PigStorage implements LoadFunc, StoreFunc {
     protected BufferedPositionedInputStream in = null;
-    private DataInputStream inData = null;
         
-	long                end            = Long.MAX_VALUE;
-	private String recordDel = "\n";
-	private String fieldDel = "\t";
+    long                end            = Long.MAX_VALUE;
+    private byte recordDel = '\n';
+    private byte fieldDel = '\t';
+    private ByteArrayOutputStream mBuf;
+    private ArrayList<Object> mProtoTuple;
+    private TupleFactory mTupleFactory = TupleFactory.getInstance();
     
     public PigStorage() {
     }
@@ -53,28 +59,44 @@
      * Constructs a Pig loader that uses specified regex as a field delimiter.
      * 
      * @param delimiter
-     *            the regular expression that is used to separate fields. ("\t" is the default.) See
-     *            http://java.sun.com/j2se/1.5.0/docs/api/java/util/regex/Pattern.html for complete
-     *            explanation.
+     *            the single byte character that is used to separate fields.
+     *            ("\t" is the default.)
      */
     public PigStorage(String delimiter) {
-        this.fieldDel = delimiter;
+        this.fieldDel = (byte)delimiter.charAt(0);
+        mBuf = new ByteArrayOutputStream(4096);
+        mProtoTuple = new ArrayList<Object>();
     }
 
-	public Tuple getNext() throws IOException {
+    public Tuple getNext() throws IOException {
         if (in == null || in.getPosition() > end) {
             return null;
         }
-        String line;
-        if((line = inData.readLine()) != null) {            
-            return new Tuple(line, fieldDel);
+
+        mBuf.reset();
+        while (true) {
+            // Hadoop's FSDataInputStream (which my input stream is based
+            // on at some point) is buffered, so I don't need to buffer.
+            int b = in.read();
+
+            if (b == fieldDel) {
+                readField();
+            } else if (b == recordDel) {
+                readField();
+                Tuple t =  mTupleFactory.newTuple(mProtoTuple);
+                mProtoTuple.clear();
+                return t;
+            } else if (b == -1) {
+                // hit end of file
+                return null;
+            } else {
+                mBuf.write(b);
+            }
         }
-        return null;
     }
 
-	public void bindTo(String fileName, BufferedPositionedInputStream in, long offset, long end) throws IOException {
+    public void bindTo(String fileName, BufferedPositionedInputStream in, long offset, long end) throws IOException {
         this.in = in;
-        inData = new DataInputStream(in);
         this.end = end;
         
         // Since we are not block aligned we throw away the first
@@ -84,16 +106,87 @@
         }
     }
     
-    OutputStream os;
+    OutputStream mOut;
     public void bindTo(OutputStream os) throws IOException {
-        this.os = os;
+        mOut = os;
     }
 
     public void putNext(Tuple f) throws IOException {
-        os.write((f.toDelimitedString(this.fieldDel) + this.recordDel).getBytes());
+        // I have to convert integer fields to string, and then to bytes.
+        // If I use a DataOutputStream to convert directly from integer to
+        // bytes, I don't get a string representation.
+        int sz = f.size();
+        for (int i = 0; i < sz; i++) {
+            Object field = f.get(i);
+            switch (DataType.findType(field)) {
+            case DataType.NULL:
+                break; // just leave it empty
+
+            case DataType.BOOLEAN:
+                mOut.write(((Boolean)field).toString().getBytes());
+                break;
+
+            case DataType.INTEGER:
+                mOut.write(((Integer)field).toString().getBytes());
+                break;
+
+            case DataType.LONG:
+                mOut.write(((Long)field).toString().getBytes());
+                break;
+
+            case DataType.FLOAT:
+                mOut.write(((Float)field).toString().getBytes());
+                break;
+
+            case DataType.DOUBLE:
+                mOut.write(((Double)field).toString().getBytes());
+                break;
+
+            case DataType.BYTEARRAY: {
+                byte[] b = ((DataByteArray)field).get();
+                mOut.write(b, 0, b.length);
+                break;
+                                     }
+
+            case DataType.CHARARRAY:
+                // oddly enough, writeBytes writes a string
+                mOut.write(((String)field).getBytes());
+                break;
+
+            case DataType.MAP:
+            case DataType.TUPLE:
+            case DataType.BAG:
+                throw new IOException("Cannot store a non-flat tuple " +
+                    "using PigStorage");
+                
+            default:
+                throw new RuntimeException("Unknown datatype " + 
+                    DataType.findType(field));
+            }
+
+            if (i == sz - 1) {
+                // last field in tuple.
+                mOut.write(recordDel);
+            } else {
+                mOut.write(fieldDel);
+            }
+        }
     }
 
     public void finish() throws IOException {
+    }
+
+    private void readField() {
+        if (mBuf.size() == 0) {
+            // NULL value
+            mProtoTuple.add(null);
+        } else {
+            // TODO, once this can take schemas, we need to figure out
+            // if the user requested this to be viewed as a certain
+            // type, and if so, then construct it appropriately.
+            mProtoTuple.add(new DataByteArray(mBuf.toByteArray()));
+        }
+        mBuf.reset();
     }
 
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/SUM.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/SUM.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/SUM.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/SUM.java Tue Jan 22 13:17:12 2008
@@ -22,9 +22,9 @@
 
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
-import org.apache.pig.data.DataAtom;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.logicalLayer.schema.AtomSchema;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 
@@ -32,11 +32,11 @@
 /**
  * Generates the sum of the values of the first field of a tuple.
  */
-public class SUM extends EvalFunc<DataAtom> implements Algebraic {
+public class SUM extends EvalFunc<Double> implements Algebraic {
 
     @Override
-    public void exec(Tuple input, DataAtom output) throws IOException {
-        output.setValue(sum(input));
+    public Double exec(Tuple input) throws IOException {
+        return sum(input);
     }
 
     public String getInitial() {
@@ -52,40 +52,43 @@
     }
 
     static public class Initial extends EvalFunc<Tuple> {
+        TupleFactory tfact = TupleFactory.getInstance();
+
         @Override
-		public void exec(Tuple input, Tuple output) throws IOException {
-            output.appendField(new DataAtom(sum(input)));
+        public Tuple exec(Tuple input) throws IOException {
+            return tfact.newTuple(sum(input));
         }
     }
-    static public class Final extends EvalFunc<DataAtom> {
+    static public class Final extends EvalFunc<Double> {
         @Override
-		public void exec(Tuple input, DataAtom output) throws IOException {
-            output.setValue(sum(input));
+        public Double exec(Tuple input) throws IOException {
+            return sum(input);
         }
     }
 
     static protected double sum(Tuple input) throws IOException {
-        DataBag values = input.getBagField(0);
+        DataBag values = (DataBag)input.get(0);
 
         double sum = 0;
-	int i = 0;
+        int i = 0;
         Tuple t = null;
-        for (Iterator it = values.content(); it.hasNext();) {
+        for (Iterator it = values.iterator(); it.hasNext();) {
             try {
             t = (Tuple) it.next();
-	    i++;
-            sum += t.getAtomField(0).numval();
+            i++;
+            sum += (Double)t.get(0);
             }catch(RuntimeException exp) {
-		String msg = "iteration = " + i + "bag size = " + values.cardinality() + " partial sum = " + sum + "\n";
-		if (t != null)
-			msg += "previous tupple = " + t.toString();
-		throw new RuntimeException(exp.getMessage() + " additional info: " + msg);
-                //throw new RuntimeException(exp.getMessage() + " error processing: " + t.toString());
+                String msg = "iteration = " + i + "bag size = " +
+                    values.size() + " partial sum = " + sum + "\n";
+                if (t != null)
+                        msg += "previous tupple = " + t.toString();
+                throw new RuntimeException(exp.getMessage() + " additional info: " + msg);
             }
         }
 
         return sum;
     }
+
     @Override
     public Schema outputSchema(Schema input) {
         return new AtomSchema("sum" + count++);

Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/TOKENIZE.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/TOKENIZE.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/TOKENIZE.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/TOKENIZE.java Tue Jan 22 13:17:12 2008
@@ -21,22 +21,28 @@
 import java.util.StringTokenizer;
 
 import org.apache.pig.EvalFunc;
+import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.logicalLayer.schema.AtomSchema;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.logicalLayer.schema.TupleSchema;
 
 
 public class TOKENIZE extends EvalFunc<DataBag> {
+    TupleFactory mTupleFactory = TupleFactory.getInstance();
+    BagFactory mBagFactory = BagFactory.getInstance();
 
     @Override
-    public void exec(Tuple input, DataBag output) throws IOException {
-        String str = input.getAtomField(0).strval();
+    public DataBag exec(Tuple input) throws IOException {
+        DataBag output = mBagFactory.newDefaultBag();
+        String str = (String)input.get(0);
         StringTokenizer tok = new StringTokenizer(str, " \",()*", false);
         while (tok.hasMoreTokens()) {
-            output.add(new Tuple(tok.nextToken()));
+            output.add(mTupleFactory.newTuple(tok.nextToken()));
         }
+        return output;
     }
 
     @Override

Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/TextLoader.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/TextLoader.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/TextLoader.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/TextLoader.java Tue Jan 22 13:17:12 2008
@@ -17,12 +17,13 @@
  */
 package org.apache.pig.builtin;
 
-import java.io.DataInputStream;
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
 import java.io.IOException;
 
 import org.apache.pig.LoadFunc;
-import org.apache.pig.data.DataAtom;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.io.BufferedPositionedInputStream;
 
 
@@ -32,13 +33,13 @@
  */
 public class TextLoader implements LoadFunc{
 	BufferedPositionedInputStream in;
-	private DataInputStream inData = null;
-    
+	private BufferedReader inData = null;
 	long                end;
+    private TupleFactory mTupleFactory = TupleFactory.getInstance();
 
 	public void bindTo(String fileName, BufferedPositionedInputStream in, long offset, long end) throws IOException {
         this.in = in;
-        inData = new DataInputStream(in);
+        inData = new BufferedReader(new InputStreamReader(in, "UTF8"));
         this.end = end;
         // Since we are not block aligned we throw away the first
         // record and cound on a different instance to read it
@@ -51,9 +52,7 @@
             return null;
         String line;
         if ((line = inData.readLine()) != null) {
-            Tuple t = new Tuple(1);
-            t.setField(0, new DataAtom(line));
-            return t;
+            return mTupleFactory.newTuple(new String(line));
         }
         return null;
     }

Modified: incubator/pig/branches/types/src/org/apache/pig/data/AmendableTuple.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/AmendableTuple.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/AmendableTuple.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/AmendableTuple.java Tue Jan 22 13:17:12 2008
@@ -17,18 +17,18 @@
  */
 package org.apache.pig.data;
 
-public class AmendableTuple extends Tuple {
-    Datum amendKey;       // the identifier of the group to which this tuple belongs.
+public class AmendableTuple extends DefaultTuple {
+    Object amendKey;       // the identifier of the group to which this tuple belongs.
 
-    public AmendableTuple(int numFields, Datum amendKey) {
+    public AmendableTuple(int numFields, Object amendKey) {
         super(numFields);
         this.amendKey = amendKey;
     }
     
-    public Datum getAmendKey() {
+    public Object getAmendKey() {
         return amendKey;
     }
-    public void setAmendKey(Datum amendKey) {
+    public void setAmendKey(Object amendKey) {
         this.amendKey = amendKey;
     }
 

Modified: incubator/pig/branches/types/src/org/apache/pig/data/BagFactory.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/BagFactory.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/BagFactory.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/BagFactory.java Tue Jan 22 13:17:12 2008
@@ -17,40 +17,99 @@
  */
 package org.apache.pig.data;
 
-import java.io.File;
-import java.io.IOException;
-
-public class BagFactory {
-
-    private File              tmpdir = null;
-    private static BagFactory instance = new BagFactory();
-
+import java.lang.Class;
+import java.lang.ClassLoader;
+import java.net.URL;
+import java.net.URLClassLoader;
+
+import org.apache.pig.impl.eval.EvalSpec;
+import org.apache.pig.impl.util.SpillableMemoryManager;
+
+/**
+ * A bag factory.  Can be used to generate different types of bags
+ * depending on what is needed.  This class is abstract so that users can
+ * override the bag factory if they desire to provide their own that
+ * returns their implementation of a bag.  If the property
+ * pig.data.bag.factory.name is set to a class name and
+ * pig.data.bag.factory.jar is set to a URL pointing to a jar that
+ * contains the above named class, then getInstance() will create a
+ * a instance of the named class using the indicatd jar.  Otherwise, it
+ * will create and instance of DefaultBagFactory.
+ */
+public abstract class BagFactory {
+    private static BagFactory gSelf = null;
+    private static SpillableMemoryManager gMemMgr;
+
+    /**
+     * Get a reference to the singleton factory.
+     */
     public static BagFactory getInstance() {
-        return instance;
+        if (gSelf == null) {
+            String factoryName =
+                System.getProperty("pig.data.bag.factory.name");
+            String factoryJar =
+                System.getProperty("pig.data.bag.factory.jar");
+            if (factoryName != null && factoryJar != null) {
+                try {
+                    URL[] urls = new URL[1];
+                    urls[0] = new URL(factoryJar);
+                    ClassLoader loader = new URLClassLoader(urls,
+                        BagFactory.class.getClassLoader());
+                    Class c = Class.forName(factoryName, true, loader);
+                    Object o = c.newInstance();
+                    if (!(o instanceof BagFactory)) {
+                        throw new RuntimeException("Provided factory " +
+                            factoryName + " does not extend BagFactory!");
+                    }
+                    gSelf = (BagFactory)o;
+                } catch (Exception e) {
+                    if (e instanceof RuntimeException) {
+                        // We just threw this
+                        RuntimeException re = (RuntimeException)e;
+                        throw re;
+                    }
+                    throw new RuntimeException("Unable to instantiate "
+                        + "bag factory " + factoryName, e);
+                }
+            } else {
+                gSelf = new DefaultBagFactory();
+            }
+        }
+        return gSelf;
     }
+    
+    /**
+     * Get a default (unordered, not distinct) data bag.
+     */
+    public abstract DataBag newDefaultBag();
+
+    /**
+     * Get a sorted data bag.
+     * @param spec EvalSpec that controls how the data is sorted.
+     * If null, default comparator will be used.
+     */
+    public abstract DataBag newSortedBag(EvalSpec spec);
+    
+    /**
+     * Get a distinct data bag.
+     */
+    public abstract DataBag newDistinctBag();
 
-    private BagFactory() {
+    protected BagFactory() {
+        gMemMgr = new SpillableMemoryManager();
     }
 
-    public static void init(File tmpdir) {
-        instance.setTmpDir(tmpdir);
+    protected void registerBag(DataBag b) {
+        gMemMgr.registerSpillable(b);
     }
 
-    private void setTmpDir(File tmpdir) {
-        this.tmpdir = tmpdir;
-        this.tmpdir.mkdirs();
-    }
-    
-    // Get BigBag or Bag, depending on whether the temp directory has been set up
-    public DataBag getNewBag(Datum.DataType type) throws IOException {
-        if (tmpdir == null) return new DataBag(type);
-        else return getNewBigBag(type);
-    }
-    
-    // Need a Big Bag, dammit!
-    public BigDataBag getNewBigBag(Datum.DataType type) throws IOException {
-        if (tmpdir == null) throw new IOException("No temp directory given for BigDataBag.");
-        else return new BigDataBag(type, tmpdir);
+    /**
+     * Provided for testing purposes only.  This function should never be
+     * called by anybody but the unit tests.
+     */
+    public static void resetSelf() {
+        gSelf = null;
     }
 
 }
+