You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/01/22 22:17:22 UTC
svn commit: r614325 [1/6] - in /incubator/pig/branches/types: ./ lib/
scripts/ src/org/apache/pig/ src/org/apache/pig/builtin/
src/org/apache/pig/data/ src/org/apache/pig/impl/
src/org/apache/pig/impl/builtin/ src/org/apache/pig/impl/eval/
src/org/apac...
Author: gates
Date: Tue Jan 22 13:17:12 2008
New Revision: 614325
URL: http://svn.apache.org/viewvc?rev=614325&view=rev
Log:
Beginning of rework to add new types using java native objects instead of pig
defined classes. This code doesn't work yet. pig.jar compiles, but the
tests don't yet.
Added:
incubator/pig/branches/types/CHANGES.txt
- copied unchanged from r610055, incubator/pig/trunk/CHANGES.txt
incubator/pig/branches/types/lib/hadoop15.jar
- copied unchanged from r610055, incubator/pig/trunk/lib/hadoop15.jar
incubator/pig/branches/types/src/org/apache/pig/ComparisonFunc.java
- copied unchanged from r610055, incubator/pig/trunk/src/org/apache/pig/ComparisonFunc.java
incubator/pig/branches/types/src/org/apache/pig/data/DataType.java
incubator/pig/branches/types/src/org/apache/pig/data/DefaultAbstractBag.java
incubator/pig/branches/types/src/org/apache/pig/data/DefaultBagFactory.java
incubator/pig/branches/types/src/org/apache/pig/data/DefaultDataBag.java
- copied, changed from r610055, incubator/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java
incubator/pig/branches/types/src/org/apache/pig/data/DefaultTupleFactory.java
incubator/pig/branches/types/src/org/apache/pig/data/DistinctDataBag.java
- copied, changed from r610055, incubator/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java
incubator/pig/branches/types/src/org/apache/pig/data/SortedDataBag.java
- copied, changed from r610055, incubator/pig/trunk/src/org/apache/pig/data/SortedDataBag.java
incubator/pig/branches/types/src/org/apache/pig/data/TupleFactory.java
incubator/pig/branches/types/src/org/apache/pig/impl/eval/EvalSpecPrinter.java
- copied unchanged from r610055, incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpecPrinter.java
incubator/pig/branches/types/src/org/apache/pig/impl/eval/EvalSpecVisitor.java
- copied unchanged from r610055, incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpecVisitor.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOPrinter.java
- copied unchanged from r610055, incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOPrinter.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOVisitor.java
- copied unchanged from r610055, incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOVisitor.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POPrinter.java
- copied unchanged from r610055, incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POPrinter.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POVisitor.java
- copied unchanged from r610055, incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POVisitor.java
incubator/pig/branches/types/src/org/apache/pig/impl/util/Spillable.java
- copied unchanged from r610055, incubator/pig/trunk/src/org/apache/pig/impl/util/Spillable.java
incubator/pig/branches/types/src/org/apache/pig/impl/util/SpillableMemoryManager.java
- copied unchanged from r610055, incubator/pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java
incubator/pig/branches/types/src/org/apache/pig/tools/grunt/GruntParser.java
- copied unchanged from r610055, incubator/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
incubator/pig/branches/types/src/org/apache/pig/tools/pigscript/
- copied from r610055, incubator/pig/trunk/src/org/apache/pig/tools/pigscript/
incubator/pig/branches/types/src/org/apache/pig/tools/pigscript/parser/ (props changed)
- copied from r610055, incubator/pig/trunk/src/org/apache/pig/tools/pigscript/parser/
incubator/pig/branches/types/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
- copied unchanged from r610055, incubator/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
incubator/pig/branches/types/test/org/apache/pig/test/NonDefaultBagFactory.java
incubator/pig/branches/types/test/org/apache/pig/test/OrdAsc.java
- copied unchanged from r610055, incubator/pig/trunk/test/org/apache/pig/test/OrdAsc.java
incubator/pig/branches/types/test/org/apache/pig/test/OrdDesc.java
- copied unchanged from r610055, incubator/pig/trunk/test/org/apache/pig/test/OrdDesc.java
incubator/pig/branches/types/test/org/apache/pig/test/OrdDescNumeric.java
- copied unchanged from r610055, incubator/pig/trunk/test/org/apache/pig/test/OrdDescNumeric.java
incubator/pig/branches/types/test/org/apache/pig/test/TestOrderBy.java
- copied unchanged from r610055, incubator/pig/trunk/test/org/apache/pig/test/TestOrderBy.java
incubator/pig/branches/types/test/org/apache/pig/test/TestPigServer.java
- copied unchanged from r610055, incubator/pig/trunk/test/org/apache/pig/test/TestPigServer.java
Removed:
incubator/pig/branches/types/lib/hadoop13.jar
incubator/pig/branches/types/src/org/apache/pig/data/BigDataBag.java
incubator/pig/branches/types/src/org/apache/pig/data/DataAtom.java
incubator/pig/branches/types/src/org/apache/pig/data/DataCharArray.java
incubator/pig/branches/types/src/org/apache/pig/data/DataCharArrayNone.java
incubator/pig/branches/types/src/org/apache/pig/data/DataCharArrayUtf16.java
incubator/pig/branches/types/src/org/apache/pig/data/DataDouble.java
incubator/pig/branches/types/src/org/apache/pig/data/DataFloat.java
incubator/pig/branches/types/src/org/apache/pig/data/DataInteger.java
incubator/pig/branches/types/src/org/apache/pig/data/DataLong.java
incubator/pig/branches/types/src/org/apache/pig/data/DataMap.java
incubator/pig/branches/types/src/org/apache/pig/data/Datum.java
incubator/pig/branches/types/src/org/apache/pig/data/DatumImpl.java
incubator/pig/branches/types/src/org/apache/pig/tools/grunt/GruntParser.jj
incubator/pig/branches/types/test/com/
incubator/pig/branches/types/test/org/apache/pig/test/TestDataCharArrayNone.java
incubator/pig/branches/types/test/org/apache/pig/test/TestDataCharArrayUtf16.java
incubator/pig/branches/types/test/org/apache/pig/test/TestDataDouble.java
incubator/pig/branches/types/test/org/apache/pig/test/TestDataFloat.java
incubator/pig/branches/types/test/org/apache/pig/test/TestDataInteger.java
incubator/pig/branches/types/test/org/apache/pig/test/TestDataLong.java
incubator/pig/branches/types/test/org/apache/pig/test/TestDataMap.java
incubator/pig/branches/types/test/org/apache/pig/test/TestDataUnknown.java
incubator/pig/branches/types/test/org/apache/pig/test/TestTuple.java
incubator/pig/branches/types/test/reports/
Modified:
incubator/pig/branches/types/ (props changed)
incubator/pig/branches/types/build.xml
incubator/pig/branches/types/scripts/pig.pl
incubator/pig/branches/types/src/org/apache/pig/EvalFunc.java
incubator/pig/branches/types/src/org/apache/pig/Main.java
incubator/pig/branches/types/src/org/apache/pig/PigServer.java
incubator/pig/branches/types/src/org/apache/pig/builtin/ARITY.java
incubator/pig/branches/types/src/org/apache/pig/builtin/AVG.java
incubator/pig/branches/types/src/org/apache/pig/builtin/BinStorage.java
incubator/pig/branches/types/src/org/apache/pig/builtin/COUNT.java
incubator/pig/branches/types/src/org/apache/pig/builtin/DIFF.java
incubator/pig/branches/types/src/org/apache/pig/builtin/IsEmpty.java
incubator/pig/branches/types/src/org/apache/pig/builtin/MAX.java
incubator/pig/branches/types/src/org/apache/pig/builtin/MIN.java
incubator/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java
incubator/pig/branches/types/src/org/apache/pig/builtin/SUM.java
incubator/pig/branches/types/src/org/apache/pig/builtin/TOKENIZE.java
incubator/pig/branches/types/src/org/apache/pig/builtin/TextLoader.java
incubator/pig/branches/types/src/org/apache/pig/data/AmendableTuple.java
incubator/pig/branches/types/src/org/apache/pig/data/BagFactory.java
incubator/pig/branches/types/src/org/apache/pig/data/DataBag.java
incubator/pig/branches/types/src/org/apache/pig/data/IndexedTuple.java
incubator/pig/branches/types/src/org/apache/pig/data/TimestampedTuple.java
incubator/pig/branches/types/src/org/apache/pig/data/Tuple.java
incubator/pig/branches/types/src/org/apache/pig/impl/PigContext.java
incubator/pig/branches/types/src/org/apache/pig/impl/builtin/ADD.java
incubator/pig/branches/types/src/org/apache/pig/impl/builtin/DIVIDE.java
incubator/pig/branches/types/src/org/apache/pig/impl/builtin/FindQuantiles.java
incubator/pig/branches/types/src/org/apache/pig/impl/builtin/GFAny.java
incubator/pig/branches/types/src/org/apache/pig/impl/builtin/GFCross.java
incubator/pig/branches/types/src/org/apache/pig/impl/builtin/GFReplicate.java
incubator/pig/branches/types/src/org/apache/pig/impl/builtin/MULTIPLY.java
incubator/pig/branches/types/src/org/apache/pig/impl/builtin/SUBTRACT.java
incubator/pig/branches/types/src/org/apache/pig/impl/builtin/ShellBagEvalFunc.java
incubator/pig/branches/types/src/org/apache/pig/impl/eval/BinCondSpec.java
incubator/pig/branches/types/src/org/apache/pig/impl/eval/CompositeEvalSpec.java
incubator/pig/branches/types/src/org/apache/pig/impl/eval/ConstSpec.java
incubator/pig/branches/types/src/org/apache/pig/impl/eval/EvalSpec.java
incubator/pig/branches/types/src/org/apache/pig/impl/eval/FilterSpec.java
incubator/pig/branches/types/src/org/apache/pig/impl/eval/FuncEvalSpec.java
incubator/pig/branches/types/src/org/apache/pig/impl/eval/GenerateSpec.java
incubator/pig/branches/types/src/org/apache/pig/impl/eval/MapLookupSpec.java
incubator/pig/branches/types/src/org/apache/pig/impl/eval/ProjectSpec.java
incubator/pig/branches/types/src/org/apache/pig/impl/eval/SimpleEvalSpec.java
incubator/pig/branches/types/src/org/apache/pig/impl/eval/SortDistinctSpec.java
incubator/pig/branches/types/src/org/apache/pig/impl/eval/StarSpec.java
incubator/pig/branches/types/src/org/apache/pig/impl/eval/collector/DataCollector.java
incubator/pig/branches/types/src/org/apache/pig/impl/eval/collector/FlattenCollector.java
incubator/pig/branches/types/src/org/apache/pig/impl/eval/collector/UnflattenCollector.java
incubator/pig/branches/types/src/org/apache/pig/impl/eval/cond/AndCond.java
incubator/pig/branches/types/src/org/apache/pig/impl/eval/cond/CompCond.java
incubator/pig/branches/types/src/org/apache/pig/impl/eval/cond/Cond.java
incubator/pig/branches/types/src/org/apache/pig/impl/eval/cond/FalseCond.java
incubator/pig/branches/types/src/org/apache/pig/impl/eval/cond/FuncCond.java
incubator/pig/branches/types/src/org/apache/pig/impl/eval/cond/NotCond.java
incubator/pig/branches/types/src/org/apache/pig/impl/eval/cond/OrCond.java
incubator/pig/branches/types/src/org/apache/pig/impl/eval/cond/RegexpCond.java
incubator/pig/branches/types/src/org/apache/pig/impl/eval/cond/TrueCond.java
incubator/pig/branches/types/src/org/apache/pig/impl/eval/window/TimeWindowSpec.java
incubator/pig/branches/types/src/org/apache/pig/impl/eval/window/TupleWindowSpec.java
incubator/pig/branches/types/src/org/apache/pig/impl/eval/window/WindowSpec.java
incubator/pig/branches/types/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java
incubator/pig/branches/types/src/org/apache/pig/impl/io/DataBagFileReader.java
incubator/pig/branches/types/src/org/apache/pig/impl/io/DataBagFileWriter.java
incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java
incubator/pig/branches/types/src/org/apache/pig/impl/io/PigFile.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOEval.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LORead.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSplit.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOStore.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUnion.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/ (props changed)
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/MapReduceLauncher.java
incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigCombine.java
incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigInputFormat.java
incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigMapReduce.java
incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigSplit.java
incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/SortPartitioner.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/IntermedResult.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/MapreducePlanCompiler.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POCogroup.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POEval.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POLoad.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POMapreduce.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PORead.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POSort.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POSplitMaster.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POSplitSlave.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POStore.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POUnion.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PhysicalOperator.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PhysicalPlan.java
incubator/pig/branches/types/src/org/apache/pig/impl/util/DataBuffer.java
incubator/pig/branches/types/src/org/apache/pig/impl/util/JarManager.java
incubator/pig/branches/types/src/org/apache/pig/impl/util/PigLogger.java
incubator/pig/branches/types/src/org/apache/pig/tools/grunt/ (props changed)
incubator/pig/branches/types/test/ (props changed)
incubator/pig/branches/types/test/org/apache/pig/test/TestAlgebraicEval.java
incubator/pig/branches/types/test/org/apache/pig/test/TestBuiltin.java
incubator/pig/branches/types/test/org/apache/pig/test/TestDataBag.java
incubator/pig/branches/types/test/org/apache/pig/test/TestDataModel.java
incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java
incubator/pig/branches/types/test/org/apache/pig/test/TestMapReduce.java
incubator/pig/branches/types/test/org/apache/pig/test/TestPigFile.java
incubator/pig/branches/types/test/org/apache/pig/test/Util.java
Propchange: incubator/pig/branches/types/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Tue Jan 22 13:17:12 2008
@@ -0,0 +1,4 @@
+
+dist
+depend
+pig.jar
Modified: incubator/pig/branches/types/build.xml
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/build.xml?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/build.xml (original)
+++ incubator/pig/branches/types/build.xml Tue Jan 22 13:17:12 2008
@@ -5,11 +5,12 @@
<property name="lib.dir" value="${basedir}/lib" />
<property name="src.dir" value="${basedir}/src" />
<property name="doc.dir" value="${basedir}/doc" />
+ <property name="test.reports.dir" value="${basedir}/test/reports" />
<property name="shock.src.dir" value="${basedir}/lib-src/shock" />
<property name="bzip2.src.dir" value="${basedir}/lib-src/bzip2" />
<property name="test.src.dir" value="${basedir}/test" />
<property name="output.jarfile" value="pig.jar" />
- <property name="hadoop.jarfile" value="hadoop14.jar"/>
+ <property name="hadoop.jarfile" value="hadoop15.jar"/>
<property name="ssh.gateway" value=""/>
<property name="hod.server" value=""/>
<property name="hod.command" value=""/>
@@ -26,6 +27,7 @@
<target name="clean">
<delete dir="${dist.dir}" />
<delete dir="${doc.dir}" />
+ <delete dir="${test.reports.dir}" />
<delete file="${output.jarfile}" />
<delete file="${src.dir}/org/apache/pig/impl/logicalLayer/parser/QueryParser.java" />
<delete file="${src.dir}/org/apache/pig/impl/logicalLayer/parser/JJTQueryParserState.java" />
@@ -39,24 +41,32 @@
<delete file="${src.dir}/org/apache/pig/impl/logicalLayer/parser/SimpleNode.java" />
<delete file="${src.dir}/org/apache/pig/impl/logicalLayer/parser/Token.java" />
<delete file="${src.dir}/org/apache/pig/impl/logicalLayer/parser/TokenMgrError.java" />
- <delete file="${src.dir}/org/apache/pig/tools/grunt/GruntParser.java" />
- <delete file="${src.dir}/org/apache/pig/tools/grunt/GruntParserConstants.java" />
- <delete file="${src.dir}/org/apache/pig/tools/grunt/GruntParserTokenManager.java" />
- <delete file="${src.dir}/org/apache/pig/tools/grunt/ParseException.java" />
- <delete file="${src.dir}/org/apache/pig/tools/grunt/SimpleCharStream.java" />
- <delete file="${src.dir}/org/apache/pig/tools/grunt/Token.java" />
- <delete file="${src.dir}/org/apache/pig/tools/grunt/TokenMgrError.java" />
+ <delete file="${src.dir}/org/apache/pig/tools/pigscript/parser/PigScriptParser.java" />
+ <delete file="${src.dir}/org/apache/pig/tools/pigscript/parser/PigScriptParserConstants.java" />
+ <delete file="${src.dir}/org/apache/pig/tools/pigscript/parser/PigScriptParserTokenManager.java" />
+ <delete file="${src.dir}/org/apache/pig/tools/pigscript/parser/ParseException.java" />
+ <delete file="${src.dir}/org/apache/pig/tools/pigscript/parser/SimpleCharStream.java" />
+ <delete file="${src.dir}/org/apache/pig/tools/pigscript/parser/Token.java" />
+ <delete file="${src.dir}/org/apache/pig/tools/pigscript/parser/TokenMgrError.java" />
</target>
- <target name="compile" depends="cc-compile">
+ <target name="depend">
+ <mkdir dir="depend"/>
+ <echo>*** Resolving dependencies ***</echo>
+ <depend srcdir="src;lib-src/shock;lib-src/bzip2" destdir="dist" cache="depend"/>
+ </target>
+
+ <target name="compile" depends="depend, cc-compile, lib-compile">
<mkdir dir="${dist.dir}" />
<echo>*** Building Main Sources ***</echo>
- <javac srcdir="${src.dir};${shock.src.dir};${bzip2.src.dir}" destdir="${dist.dir}" target="1.5" debug="on">
+ <javac srcdir="${src.dir};${shock.src.dir};${bzip2.src.dir}" destdir="${dist.dir}" target="1.5" debug="on" deprecation="on">
<classpath refid="classpath" />
+ <!--<compilerarg value="-Xlint:unchecked"/> -->
</javac>
<echo>*** Building Test Sources ***</echo>
<javac srcdir="test" destdir="${dist.dir}" debug="on">
<classpath refid="classpath" />
+ <compilerarg value="-Xlint:unchecked"/>
</javac>
</target>
@@ -70,9 +80,17 @@
outputdirectory="${src.dir}/org/apache/pig/impl/logicalLayer/parser"
javacchome="${basedir}/lib" />
<javacc
- target="${src.dir}/org/apache/pig/tools/grunt/GruntParser.jj"
- outputdirectory="${src.dir}/org/apache/pig/tools/grunt"
+ target="${src.dir}/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj"
+ outputdirectory="${src.dir}/org/apache/pig/tools/pigscript/parser"
javacchome="${basedir}/lib" />
+ </target>
+
+ <target name="lib-compile">
+ <mkdir dir="${dist.dir}" />
+ <echo>*** Building Library Sources ***</echo>
+ <javac srcdir="${shock.src.dir};${bzip2.src.dir}" destdir="${dist.dir}" target="1.5" debug="on">
+ <classpath refid="classpath" />
+ </javac>
</target>
<target name="jar" depends="compile">
Modified: incubator/pig/branches/types/scripts/pig.pl
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/scripts/pig.pl?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/scripts/pig.pl (original)
+++ incubator/pig/branches/types/scripts/pig.pl Tue Jan 22 13:17:12 2008
@@ -19,7 +19,7 @@
# Read our configuration file. This will fill in values for pigJarRoot
# and hodRoot.
our $ROOT = (defined($ENV{'ROOT'}) ? $ENV{'ROOT'} : "/home/y");
-my ($pigJarRoot, $hodRoot);
+my ($pigJarRoot, $hodRoot, $defaultCluster);
open(CFG, "< $ROOT/conf/pigclient.conf") or
die "Can't open $ROOT/conf/pigclient.conf, $ERRNO\n";
@@ -67,6 +67,16 @@
if (defined $classpath)
{
+ # Check to make sure that the jar file specified in the class path is
+ # available.
+ $classpath =~ /(^|:)([^:]*pig.jar)($|:)/;
+ my $jar = $2;
+ if (!(-e $jar)) {
+ die "I can't find the jar file $jar. If you explicitly
+put this jar in your classpath, please check that you have the path name
+correct. If you specified a cluster via -c[luster], then the pig jar for
+that cluster is not present on this machine.\n";
+ }
push (@javaArgs, "-cp", $classpath);
}
@@ -89,10 +99,10 @@
# If we aren't attaching to kryptonite, set up the right hod config file.
if ($cluster ne "kryptonite") {
- my $hodCfg = "$hodRoot/conf/$cluster";
- if (-e $hodCfg) {
- $hodParam .= "--config=$hodCfg";
- } else {
+ # With splitting of gateways, HOD file is always hodrc, no matter what
+ # cluster you're talking to.
+ my $hodCfg = "$hodRoot/conf/hodrc";
+ if (! (-e $hodCfg)) {
push(@cmd, "-Dhod.server=");
warn "I can't find HOD configuration for $cluster, hopefully you weren't planning on using HOD.\n";
}
@@ -118,8 +128,9 @@
# first, figure out if we are working with a deployed cluster
if (!(defined $cluster) && (!(defined $classpath) || !($classpath =~/pig.jar/)))
{
- # we are using default cluster
- $cluster = 'kryptonite';
+ # we are using default cluster, the name of which is stored in the
+ # pigclient.conf file.
+ $cluster = $defaultCluster;
}
# we are running from a cluster
Modified: incubator/pig/branches/types/src/org/apache/pig/EvalFunc.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/EvalFunc.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/EvalFunc.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/EvalFunc.java Tue Jan 22 13:17:12 2008
@@ -21,7 +21,6 @@
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
-import org.apache.pig.data.Datum;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.logicalLayer.schema.Schema;
@@ -43,7 +42,7 @@
* @author database-systems@yahoo.research
*
*/
-public abstract class EvalFunc<T extends Datum> {
+public abstract class EvalFunc<T> {
protected Type returnType;
@@ -70,9 +69,11 @@
returnType = parameters[0];
+ /*
if (returnType == Datum.class){
throw new RuntimeException("Eval function must return a specific type of Datum");
}
+ */
//Type check the initial, intermediate, and final functions
@@ -135,9 +136,10 @@
* invocations of this method.
*
* @param input the Tuple to be processed.
+ * @return result, of type T.
* @throws IOException
*/
- abstract public void exec(Tuple input, T output) throws IOException;
+ abstract public T exec(Tuple input) throws IOException;
/**
* @param input Schema of the input
Modified: incubator/pig/branches/types/src/org/apache/pig/Main.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/Main.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/Main.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/Main.java Tue Jan 22 13:17:12 2008
@@ -25,10 +25,12 @@
import org.apache.hadoop.util.HadoopExe;
+import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Logger;
import org.apache.log4j.Level;
import org.apache.log4j.ConsoleAppender;
import org.apache.log4j.PatternLayout;
+import org.apache.log4j.PropertyConfigurator;
import org.apache.pig.PigServer.ExecType;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.util.PigLogger;
@@ -65,11 +67,15 @@
int port = 0;
String file = null;
Level logLevel = Level.INFO;
+ boolean brief = false;
+ String log4jconf = null;
boolean verbose = false;
CmdLineParser opts = new CmdLineParser(args);
// Don't use -l, --latest, -c, --cluster, -cp, -classpath, -D as these
// are masked by the startup perl script.
+ opts.registerOpt('4', "log4jconf", CmdLineParser.ValueExpected.REQUIRED);
+ opts.registerOpt('b', "brief", CmdLineParser.ValueExpected.NOT_ACCEPTED);
opts.registerOpt('c', "cluster", CmdLineParser.ValueExpected.REQUIRED);
opts.registerOpt('d', "debug", CmdLineParser.ValueExpected.REQUIRED);
opts.registerOpt('e', "execute", CmdLineParser.ValueExpected.NOT_ACCEPTED);
@@ -83,6 +89,14 @@
char opt;
while ((opt = opts.getNextOpt()) != CmdLineParser.EndOfOpts) {
switch (opt) {
+ case '4':
+ log4jconf = opts.getValStr();
+ break;
+
+ case 'b':
+ brief = true;
+ break;
+
case 'c': {
// Needed away to specify the cluster to run the MR job on
// Bug 831708 - fixed
@@ -155,14 +169,35 @@
LogicalPlanBuilder.classloader = pigContext.createCl(null);
- // Set the log level, and set up appenders
- Logger log = PigLogger.getLogger();
- log.setLevel(logLevel);
- ConsoleAppender screen = new ConsoleAppender(new PatternLayout());
- if (verbose) screen.setThreshold(logLevel);
- else screen.setThreshold(Level.INFO);
- screen.setTarget(ConsoleAppender.SYSTEM_ERR);
- log.addAppender(screen);
+ if (log4jconf != null) {
+ PropertyConfigurator.configure(log4jconf);
+ } else if (!brief) {
+ // non-brief logging - timestamps
+ Properties props = new Properties();
+ props.setProperty("log4j.rootLogger", "INFO, PIGCONSOLE");
+ props.setProperty("log4j.appender.PIGCONSOLE",
+ "org.apache.log4j.ConsoleAppender");
+ props.setProperty("log4j.appender.PIGCONSOLE.layout",
+ "org.apache.log4j.PatternLayout");
+ props.setProperty("log4j.appender.PIGCONSOLE.layout.ConversionPattern",
+ "%d [%t] %-5p %c - %m%n");
+ PropertyConfigurator.configure(props);
+ // Set the log level/threshold
+ Logger.getRootLogger().setLevel(verbose ? Level.ALL : logLevel);
+ } else {
+ // brief logging - no timestamps
+ Properties props = new Properties();
+ props.setProperty("log4j.rootLogger", "INFO, PIGCONSOLE");
+ props.setProperty("log4j.appender.PIGCONSOLE",
+ "org.apache.log4j.ConsoleAppender");
+ props.setProperty("log4j.appender.PIGCONSOLE.layout",
+ "org.apache.log4j.PatternLayout");
+ props.setProperty("log4j.appender.PIGCONSOLE.layout.ConversionPattern",
+ "%m%n");
+ PropertyConfigurator.configure(props);
+ // Set the log level/threshold
+ Logger.getRootLogger().setLevel(verbose ? Level.ALL : logLevel);
+ }
// TODO Add a file appender for the logs
// TODO Need to create a property in the properties file for it.
@@ -247,6 +282,8 @@
System.err.println(" Pig [options] -e[xecute] cmd [cmd ...] : Run cmd(s).");
System.err.println(" Pig [options] [-f[ile]] file : Run cmds found in file.");
System.err.println(" options include:");
+ System.err.println(" -4, -log4jconf log4j configuration file, overrides log conf");
+ System.err.println(" -b, -brief brief logging (no timestamps)");
System.err.println(" -c, -cluster clustername, kryptonite is default");
System.err.println(" -d, -debug debug level, INFO is default");
System.err.println(" -h, -help display this message");
Modified: incubator/pig/branches/types/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/PigServer.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/PigServer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/PigServer.java Tue Jan 22 13:17:12 2008
@@ -20,16 +20,22 @@
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
+import java.io.PrintStream;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
+import java.util.Enumeration;
+import java.util.Vector;
+import java.net.URL;
+import java.net.URI;
+import java.net.URISyntaxException;
import org.apache.hadoop.dfs.DistributedFileSystem;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.data.Tuple;
-import org.apache.pig.data.Datum;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.BufferedPositionedInputStream;
import org.apache.pig.impl.io.FileLocalizer;
@@ -39,6 +45,8 @@
import org.apache.pig.impl.logicalLayer.LOStore;
import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.logicalLayer.LOVisitor;
+import org.apache.pig.impl.logicalLayer.LOPrinter;
import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
import org.apache.pig.impl.logicalLayer.parser.ParseException;
import org.apache.pig.impl.logicalLayer.schema.TupleSchema;
@@ -47,6 +55,9 @@
import org.apache.pig.impl.physicalLayer.POMapreduce;
import org.apache.pig.impl.physicalLayer.POStore;
import org.apache.pig.impl.physicalLayer.PhysicalPlan;
+import org.apache.pig.impl.physicalLayer.POVisitor;
+import org.apache.pig.impl.physicalLayer.POPrinter;
+import org.apache.pig.impl.util.PigLogger;
@@ -146,11 +157,59 @@
pigContext.registerFunction(function, functionSpec);
}
- public void registerJar(String path) throws IOException{
- File f = new File(path);
- if (!f.canRead())
- throw new IOException("Can't read " + path);
- pigContext.addJar(path);
+ private URL locateJarFromResources(String jarName) throws IOException {
+ Enumeration<URL> urls = ClassLoader.getSystemResources(jarName);
+ URL resourceLocation = null;
+
+ if (urls.hasMoreElements()) {
+ resourceLocation = urls.nextElement();
+ }
+
+ if (pigContext.debug && urls.hasMoreElements()) {
+ String logMessage = "Found multiple resources that match "
+ + jarName + ": " + resourceLocation;
+
+ while (urls.hasMoreElements()) {
+ logMessage += (logMessage + urls.nextElement() + "; ");
+ }
+
+ PigLogger.getLogger().debug(logMessage);
+ }
+
+ return resourceLocation;
+ }
+
+ /**
+ * Registers a jar file. Name of the jar file can be an absolute or
+ * relative path.
+ *
+ * If multiple resources are found with the specified name, the
+ * first one is registered as returned by getSystemResources.
+ * A warning is issued to inform the user.
+ *
+ * @param name of the jar file to register
+ * @throws IOException
+ */
+ public void registerJar(String name) throws IOException {
+ // first try to locate jar via system resources
+ // if this fails, try by using "name" as File (this preserves
+ // compatibility with case when user passes absolute path or path
+ // relative to current working directory.)
+ if (name != null) {
+ URL resource = locateJarFromResources(name);
+
+ if (resource == null) {
+ File f = new File(name);
+
+ if (!f.canRead()) {
+ throw new IOException("Can't read jar file: " + name);
+ }
+
+ resource = f.toURI().toURL();
+ }
+
+ pigContext.addJar(resource);
+ }
}
/**
@@ -244,12 +303,7 @@
pp = physicalPlans.get(readFrom);
}
- // Data bags are guaranteed to contain tuples.
- //return pp.exec(continueFromLast).content();
- // A direct subversion of the type system, this has to be bad.
- Iterator<Datum> i = pp.exec(continueFromLast).content();
- Object o = i;
- return (Iterator<Tuple>)o;
+ return pp.exec(continueFromLast).iterator();
}
@@ -265,10 +319,8 @@
readFrom.compile(queryResults);
readFrom.exec();
- if (pigContext.getExecType() == ExecType.LOCAL) {
- Object o = readFrom.read().content();
- return (Iterator<Tuple>)o;
- }
+ if (pigContext.getExecType() == ExecType.LOCAL)
+ return readFrom.read().iterator();
final LoadFunc p;
try{
@@ -470,6 +522,35 @@
}
/**
+ * Provide information on how a pig query will be executed. For now
+ * this information is very developer focussed, and probably not very
+ * useful to the average user.
+ * @param alias Name of alias to explain.
+ * @param stream PrintStream to write explanation to.
+ * @throws IOException if the requested alias cannot be found.
+ */
+ public void explain(
+ String alias,
+ PrintStream stream) throws IOException {
+ stream.println("Logical Plan:");
+ IntermedResult ir = queryResults.get(alias);
+ if (ir == null) {
+ PigLogger.getLogger().error("Invalid alias: " + alias);
+ throw new IOException("Invalid alias: " + alias);
+ }
+
+ LOVisitor lprinter = new LOPrinter(stream);
+ ir.lp.getRoot().visit(lprinter);
+
+ stream.println("-----------------------------------------------");
+ stream.println("Physical Plan:");
+ // have to first compile the plan
+ ir.compile(queryResults);
+ POVisitor pprinter = new POPrinter(stream);
+ ir.pp.root.visit(pprinter);
+ }
+
+ /**
* Returns the unused byte capacity of an HDFS filesystem. This value does
* not take into account a replication factor, as that can vary from file
* to file. Thus if you are using this to determine if you data set will fit
@@ -496,7 +577,7 @@
public long fileSize(String filename) throws IOException {
FileSystem dfs = pigContext.getDfs();
Path p = new Path(filename);
- long len = dfs.getLength(p);
+ long len = dfs.getFileStatus(p).getLen();
long replication = dfs.getDefaultReplication(); // did not work, for some reason: dfs.getReplication(p);
return len * replication;
}
@@ -518,10 +599,10 @@
}
public String[] listPaths(String dir) throws IOException {
- Path paths[] = pigContext.getDfs().listPaths(new Path(dir));
- String strPaths[] = new String[paths.length];
- for (int i = 0; i < paths.length; i++) {
- strPaths[i] = paths[i].toString();
+ FileStatus stats[] = pigContext.getDfs().listStatus(new Path(dir));
+ String strPaths[] = new String[stats.length];
+ for (int i = 0; i < stats.length; i++) {
+ strPaths[i] = stats[i].getPath().toString();
}
return strPaths;
}
Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/ARITY.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/ARITY.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/ARITY.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/ARITY.java Tue Jan 22 13:17:12 2008
@@ -20,17 +20,17 @@
import java.io.IOException;
import org.apache.pig.EvalFunc;
-import org.apache.pig.data.DataAtom;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.schema.AtomSchema;
import org.apache.pig.impl.logicalLayer.schema.Schema;
-public class ARITY extends EvalFunc<DataAtom> {
+public class ARITY extends EvalFunc<Integer> {
@Override
- public void exec(Tuple input, DataAtom output) throws IOException {
- output.setValue(input.arity());
+ public Integer exec(Tuple input) throws IOException {
+ return new Integer(input.size());
}
+
@Override
public Schema outputSchema(Schema input) {
return new AtomSchema("arity");
Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/AVG.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/AVG.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/AVG.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/AVG.java Tue Jan 22 13:17:12 2008
@@ -22,9 +22,9 @@
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
-import org.apache.pig.data.DataAtom;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.schema.AtomSchema;
import org.apache.pig.impl.logicalLayer.schema.Schema;
@@ -33,10 +33,12 @@
* Generates the average of the values of the first field of a tuple. This class is Algebraic in
* implemenation, so if possible the execution will be split into a local and global application
*/
-public class AVG extends EvalFunc<DataAtom> implements Algebraic {
+public class AVG extends EvalFunc<Double> implements Algebraic {
+
+ private static TupleFactory mTupleFactory = TupleFactory.getInstance();
@Override
- public void exec(Tuple input, DataAtom output) throws IOException {
+ public Double exec(Tuple input) throws IOException {
double sum = sum(input);
double count = count(input);
@@ -44,7 +46,7 @@
if (count > 0)
avg = sum / count;
- output.setValue(avg);
+ return new Double(avg);
}
public String getInitial() {
@@ -61,79 +63,72 @@
static public class Initial extends EvalFunc<Tuple> {
@Override
- public void exec(Tuple input, Tuple output) throws IOException {
- try {
- output.appendField(new DataAtom(sum(input)));
- output.appendField(new DataAtom(count(input)));
- output.appendField(new DataAtom("processed by initial"));
- } catch(RuntimeException t) {
- throw new RuntimeException(t.getMessage() + ": " + input);
- }
+ public Tuple exec(Tuple input) throws IOException {
+ try {
+ Tuple t = mTupleFactory.newTuple(2);
+ t.set(0, sum(input));
+ t.set(1, count(input));
+ return t;
+ } catch(RuntimeException t) {
+ throw new RuntimeException(t.getMessage() + ": " + input);
+ }
}
}
static public class Intermed extends EvalFunc<Tuple> {
@Override
- public void exec(Tuple input, Tuple output) throws IOException {
- combine(input.getBagField(0), output);
+ public Tuple exec(Tuple input) throws IOException {
+ DataBag b = (DataBag)input.get(0);
+ return combine(b);
}
}
- static public class Final extends EvalFunc<DataAtom> {
+ static public class Final extends EvalFunc<Double> {
@Override
- public void exec(Tuple input, DataAtom output) throws IOException {
- Tuple combined = new Tuple();
- if(input.getField(0) instanceof DataBag) {
- combine(input.getBagField(0), combined);
- } else {
- throw new RuntimeException("Bag not found in: " + input);
-
-
- //combined = input.getTupleField(0);
- }
- double sum = combined.getAtomField(0).numval();
- double count = combined.getAtomField(1).numval();
+ public Double exec(Tuple input) throws IOException {
+ DataBag b = (DataBag)input.get(0);
+ Tuple combined = combine(b);
+
+ double sum = (Double)combined.get(0);
+ double count = (Long)combined.get(1);
double avg = 0;
if (count > 0) {
avg = sum / count;
}
- output.setValue(avg);
+ return new Double(avg);
}
}
- static protected void combine(DataBag values, Tuple output) throws IOException {
+ static protected Tuple combine(DataBag values) throws IOException {
double sum = 0;
- double count = 0;
+ long count = 0;
- for (Iterator it = values.content(); it.hasNext();) {
- Tuple t = (Tuple) it.next();
-// if(!(t.getField(0) instanceof DataAtom)) {
-// throw new RuntimeException("Unexpected Type: " + t.getField(0).getClass().getName() + " in " + t);
-// }
-
- sum += t.getAtomField(0).numval();
- count += t.getAtomField(1).numval();
+ Tuple output = mTupleFactory.newTuple(2);
+
+ for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
+ Tuple t = it.next();
+ sum += (Double)t.get(0);
+ count += (Long)t.get(1);
}
- output.appendField(new DataAtom(sum));
- output.appendField(new DataAtom(count));
+ output.set(0, new Double(sum));
+ output.set(1, new Long(count));
+ return output;
}
static protected long count(Tuple input) throws IOException {
- DataBag values = input.getBagField(0);
-
-
- return values.cardinality();
+ DataBag values = (DataBag)input.get(0);
+ return values.size();
}
static protected double sum(Tuple input) throws IOException {
- DataBag values = input.getBagField(0);
+ DataBag values = (DataBag)input.get(0);
double sum = 0;
- for (Iterator it = values.content(); it.hasNext();) {
- Tuple t = (Tuple) it.next();
- sum += t.getAtomField(0).numval();
+ for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
+ Tuple t = it.next();
+ sum += (Double)t.get(0);
}
return sum;
Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/BinStorage.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/BinStorage.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/BinStorage.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/BinStorage.java Tue Jan 22 13:17:12 2008
@@ -26,11 +26,16 @@
import org.apache.pig.LoadFunc;
import org.apache.pig.StoreFunc;
+import org.apache.pig.data.DataReaderWriter;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.BufferedPositionedInputStream;
public class BinStorage implements LoadFunc, StoreFunc {
+ public static final byte RECORD_1 = 0x21;
+ public static final byte RECORD_2 = 0x31;
+ public static final byte RECORD_3 = 0x41;
+
Iterator<Tuple> i = null;
protected BufferedPositionedInputStream in = null;
private DataInputStream inData = null;
@@ -51,25 +56,23 @@
return null;
}
b = (byte) in.read();
- if(b != Tuple.RECORD_1 && b != -1) {
+ if(b != RECORD_1 && b != -1) {
continue;
}
if(b == -1) return null;
b = (byte) in.read();
- if(b != Tuple.RECORD_2 && b != -1) {
+ if(b != RECORD_2 && b != -1) {
continue;
}
if(b == -1) return null;
b = (byte) in.read();
- if(b != Tuple.RECORD_3 && b != -1) {
+ if(b != RECORD_3 && b != -1) {
continue;
}
if(b == -1) return null;
break;
}
- Tuple t = new Tuple();
- t.readFields(inData);
- return t;
+ return (Tuple)DataReaderWriter.readDatum(inData);
}
public void bindTo(String fileName, BufferedPositionedInputStream in, long offset, long end) throws IOException {
@@ -90,9 +93,9 @@
}
public void putNext(Tuple t) throws IOException {
- out.write(Tuple.RECORD_1);
- out.write(Tuple.RECORD_2);
- out.write(Tuple.RECORD_3);
+ out.write(RECORD_1);
+ out.write(RECORD_2);
+ out.write(RECORD_3);
t.write(out);
}
}
Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/COUNT.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/COUNT.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/COUNT.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/COUNT.java Tue Jan 22 13:17:12 2008
@@ -19,14 +19,14 @@
import java.io.IOException;
import java.util.Iterator;
+import java.util.Map;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
-import org.apache.pig.data.DataAtom;
import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataMap;
-import org.apache.pig.data.Datum;
+import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.schema.AtomSchema;
import org.apache.pig.impl.logicalLayer.schema.Schema;
@@ -34,11 +34,11 @@
* Generates the count of the values of the first field of a tuple. This class is Algebraic in
* implemenation, so if possible the execution will be split into a local and global functions
*/
-public class COUNT extends EvalFunc<DataAtom> implements Algebraic{
+public class COUNT extends EvalFunc<Long> implements Algebraic{
@Override
- public void exec(Tuple input, DataAtom output) throws IOException {
- output.setValue(count(input));
+ public Long exec(Tuple input) throws IOException {
+ return count(input);
}
public String getInitial() {
@@ -54,43 +54,50 @@
}
static public class Initial extends EvalFunc<Tuple> {
+ TupleFactory tfact = TupleFactory.getInstance();
+
@Override
- public void exec(Tuple input, Tuple output) throws IOException {
- output.appendField(new DataAtom(count(input)));
+ public Tuple exec(Tuple input) throws IOException {
+ return tfact.newTuple(count(input));
}
}
static public class Intermed extends EvalFunc<Tuple> {
+ TupleFactory tfact = TupleFactory.getInstance();
+
@Override
- public void exec(Tuple input, Tuple output) throws IOException {
- output.appendField(new DataAtom(sum(input)));
+ public Tuple exec(Tuple input) throws IOException {
+ return tfact.newTuple(count(input));
}
}
- static public class Final extends EvalFunc<DataAtom> {
+ static public class Final extends EvalFunc<Long> {
@Override
- public void exec(Tuple input, DataAtom output) throws IOException {
- output.setValue(sum(input));
+ public Long exec(Tuple input) throws IOException {
+ return sum(input);
}
}
- static protected long count(Tuple input) throws IOException {
- Datum values = input.getField(0);
+ static protected Long count(Tuple input) throws IOException {
+ Object values = input.get(0);
if (values instanceof DataBag)
- return ((DataBag)values).cardinality();
- else if (values instanceof DataMap)
- return ((DataMap)values).cardinality();
+ return ((DataBag)values).size();
+ else if (values instanceof Map)
+ return new Long(((Map)values).size());
else
- throw new IOException("Cannot count a " + values.getClass().getSimpleName());
+ throw new IOException("Cannot count a " +
+ DataType.findTypeName(values));
}
- static protected double sum(Tuple input) throws IOException {
- DataBag values = input.getBagField(0);
- double sum = 0;
- for (Iterator<Datum> it = values.content(); it.hasNext();) {
- Tuple t = (Tuple)it.next();
+ static protected Long sum(Tuple input) throws IOException {
+ DataBag values = (DataBag)input.get(0);
+ long sum = 0;
+ for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
+ Tuple t = it.next();
try {
- sum += t.getAtomField(0).numval();
+ // Have faith here. Checking each value before the cast is
+ // just too much.
+ sum += (Long)t.get(0);
} catch (NumberFormatException exp) {
throw new IOException(exp.getClass().getName() + ":" + exp.getMessage());
}
@@ -98,7 +105,7 @@
return sum;
}
-@Override
+ @Override
public Schema outputSchema(Schema input) {
return new AtomSchema("count" + count++);
}
Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/DIFF.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/DIFF.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/DIFF.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/DIFF.java Tue Jan 22 13:17:12 2008
@@ -21,11 +21,10 @@
import java.util.Iterator;
import org.apache.pig.EvalFunc;
-import org.apache.pig.data.DataAtom;
+import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
-import org.apache.pig.data.Datum;
-import org.apache.pig.data.AtomicDatum;
+import org.apache.pig.data.TupleFactory;
/**
@@ -37,46 +36,82 @@
*
*/
public class DIFF extends EvalFunc<DataBag> {
+ TupleFactory mTupleFactory = TupleFactory.getInstance();
+ BagFactory mBagFactory = BagFactory.getInstance();
+
/**
* Compares a tuple with two fields. Emits any differences.
* @param input a tuple with exactly two fields.
* @throws IOException if there are not exactly two fields in a tuple
*/
@Override
- public void exec(Tuple input, DataBag output) throws IOException {
- if (input.arity() != 2) {
- throw new IOException("DIFF must compare two fields not " + input.arity());
+ public DataBag exec(Tuple input) throws IOException {
+ if (input.size() != 2) {
+ throw new IOException("DIFF must compare two fields not " +
+ input.size());
}
- if (input.getField(0).getType() == Datum.DataType.BAG) {
- DataBag field1 = input.getBagField(0);
- DataBag field2 = input.getBagField(1);
- Iterator<Datum> it1 = field1.content();
- checkInBag(field2, it1, output);
- Iterator<Datum> it2 = field2.content();
- checkInBag(field1, it2, output);
+ DataBag output = mBagFactory.newDefaultBag();
+ Object o1 = input.get(0);
+ if (o1 instanceof DataBag) {
+ DataBag bag1 = (DataBag)o1;
+ DataBag bag2 = (DataBag)input.get(1);
+ computeDiff(bag1, bag2, output);
} else {
- AtomicDatum d1 = input.getAtomField(0);
- AtomicDatum d2 = input.getAtomField(1);
+ Object d1 = input.get(0);
+ Object d2 = input.get(1);
if (!d1.equals(d2)) {
- output.add(new Tuple(d1));
- output.add(new Tuple(d2));
+ output.add(mTupleFactory.newTuple(d1));
+ output.add(mTupleFactory.newTuple(d2));
}
}
+ return output;
}
- private void checkInBag(DataBag bag, Iterator<Datum> iterator, DataBag emitTo) throws IOException {
- while(iterator.hasNext()) {
- Datum t = iterator.next();
- Iterator<Datum> it2 = bag.content();
- boolean found = false;
- while(it2.hasNext()) {
- if (t.equals(it2.next())) {
- found = true;
- }
- }
- if (!found) {
- emitTo.add(t);
+ private void computeDiff(
+ DataBag bag1,
+ DataBag bag2,
+ DataBag emitTo) throws IOException {
+ // Create two distinct versions of the bag. This will speed up
+ // comparison, and provide us a sorted order so we don't have to do
+ // an n^2 lookup.
+ DataBag d1 = mBagFactory.newDistinctBag();
+ DataBag d2 = mBagFactory.newDistinctBag();
+ Iterator<Tuple> i1 = d1.iterator();
+ Iterator<Tuple> i2 = d2.iterator();
+ while (i1.hasNext()) d1.add(i1.next());
+ while (i2.hasNext()) d2.add(i2.next());
+
+ i1 = d1.iterator();
+ i2 = d2.iterator();
+
+ Tuple t1 = i1.next();
+ Tuple t2 = i2.next();
+
+ while (i1.hasNext() && i2.hasNext()) {
+ int c = t1.compareTo(t2);
+
+ if (c < 0) {
+ // place t1 in the result bag and advance i1
+ emitTo.add(t1);
+ t1 = i1.next();
+ } else if (c > 0) {
+ // place t2 in the result bag and advance i2
+ emitTo.add(t2);
+ t2 = i2.next();
+ } else if (c == 0) {
+ // put neither in the result bag, advance both iterators
+ t1 = i1.next();
+ t2 = i2.next();
}
+ }
+
+ // One ran out, put all the rest of the other (if there are any) in
+ // the result bag.
+ while (i1.hasNext()) {
+ emitTo.add(i1.next());
+ }
+ while (i2.hasNext()) {
+ emitTo.add(i2.next());
}
}
Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/IsEmpty.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/IsEmpty.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/IsEmpty.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/IsEmpty.java Tue Jan 22 13:17:12 2008
@@ -18,25 +18,26 @@
package org.apache.pig.builtin;
import java.io.IOException;
+import java.util.Map;
import org.apache.pig.FilterFunc;
import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataMap;
-import org.apache.pig.data.Datum;
import org.apache.pig.data.Tuple;
+import org.apache.pig.data.DataType;
public class IsEmpty extends FilterFunc {
@Override
public boolean exec(Tuple input) throws IOException {
- Datum values = input.getField(0);
+ Object values = input.get(0);
if (values instanceof DataBag)
- return ((DataBag)values).cardinality() == 0;
- else if (values instanceof DataMap)
- return ((DataMap)values).cardinality() == 0;
+ return ((DataBag)values).size() == 0;
+ else if (values instanceof Map)
+ return ((Map)values).size() == 0;
else
- throw new IOException("Cannot test a " + values.getClass().getSimpleName() + " for emptiness.");
+ throw new IOException("Cannot test a " +
+ DataType.findTypeName(values) + " for emptiness.");
}
}
Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/MAX.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/MAX.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/MAX.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/MAX.java Tue Jan 22 13:17:12 2008
@@ -22,9 +22,9 @@
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
-import org.apache.pig.data.DataAtom;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.schema.AtomSchema;
import org.apache.pig.impl.logicalLayer.schema.Schema;
@@ -32,58 +32,67 @@
/**
* Generates the max of the values of the first field of a tuple.
*/
-public class MAX extends EvalFunc<DataAtom> implements Algebraic {
+public class MAX extends EvalFunc<Double> implements Algebraic {
- @Override
- public void exec(Tuple input, DataAtom output) throws IOException {
- output.setValue(max(input));
- }
-
- public String getInitial() {
- return Initial.class.getName();
- }
-
- public String getIntermed() {
- return Initial.class.getName();
- }
-
- public String getFinal() {
- return Final.class.getName();
- }
-
- static public class Initial extends EvalFunc<Tuple> {
- @Override
- public void exec(Tuple input, Tuple output) throws IOException {
- output.appendField(new DataAtom(max(input)));
- }
- }
- static public class Final extends EvalFunc<DataAtom> {
- @Override
- public void exec(Tuple input, DataAtom output) throws IOException {
- output.setValue(max(input));
- }
- }
-
- static protected double max(Tuple input) throws IOException {
- DataBag values = input.getBagField(0);
-
- double curMax = Double.NEGATIVE_INFINITY;
- for (Iterator it = values.content(); it.hasNext();) {
- Tuple t = (Tuple) it.next();
- try {
- curMax = java.lang.Math.max(curMax, t.getAtomField(0).numval());
- }catch(RuntimeException exp) {
- throw new IOException("Error processing: " + t.toString(), exp);
-
- }
- }
-
- return curMax;
- }
- @Override
- public Schema outputSchema(Schema input) {
- return new AtomSchema("max" + count++);
- }
+ @Override
+ public Double exec(Tuple input) throws IOException {
+ return max(input);
+ }
+
+ public String getInitial() {
+ return Initial.class.getName();
+ }
+
+ public String getIntermed() {
+ return Initial.class.getName();
+ }
+
+ public String getFinal() {
+ return Final.class.getName();
+ }
+
+ static public class Initial extends EvalFunc<Tuple> {
+ TupleFactory tfact = TupleFactory.getInstance();
+
+ @Override
+ public Tuple exec(Tuple input) throws IOException {
+ return tfact.newTuple(max(input));
+ }
+ }
+ static public class Final extends EvalFunc<Double> {
+ @Override
+ public Double exec(Tuple input) throws IOException {
+ return max(input);
+ }
+ }
+
+ static protected Double max(Tuple input) throws IOException {
+ Object o = input.get(0);
+ if (!(o instanceof DataBag)) {
+ throw new IOException("Input to sum function should be a bag");
+ }
+ DataBag values = (DataBag)o;
+
+ double curMax = Double.NEGATIVE_INFINITY;
+ for (Iterator it = values.iterator(); it.hasNext();) {
+ Tuple t = (Tuple)it.next();
+ try {
+ curMax = java.lang.Math.max(curMax, (Double)t.get(0));
+ } catch (RuntimeException exp) {
+ IOException newE = new IOException("Error processing: " +
+ t.toString() + exp.getMessage());
+ newE.initCause(exp);
+ throw newE;
+ }
+ }
+
+ return curMax;
+ }
+
+ @Override
+ public Schema outputSchema(Schema input) {
+ return new AtomSchema("max" + count++);
+ }
- private static int count = 1;
+ private static int count = 1;
}
Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/MIN.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/MIN.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/MIN.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/MIN.java Tue Jan 22 13:17:12 2008
@@ -22,9 +22,9 @@
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
-import org.apache.pig.data.DataAtom;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.schema.AtomSchema;
import org.apache.pig.impl.logicalLayer.schema.Schema;
@@ -32,57 +32,63 @@
/**
* Generates the min of the values of the first field of a tuple.
*/
-public class MIN extends EvalFunc<DataAtom> implements Algebraic {
+public class MIN extends EvalFunc<Double> implements Algebraic {
- @Override
- public void exec(Tuple input, DataAtom output) throws IOException {
- output.setValue(min(input));
- }
-
- public String getInitial() {
- return Initial.class.getName();
- }
-
- public String getIntermed() {
- return Initial.class.getName();
- }
-
- public String getFinal() {
- return Final.class.getName();
- }
-
- static public class Initial extends EvalFunc<Tuple> {
- @Override
- public void exec(Tuple input, Tuple output) throws IOException {
- output.appendField(new DataAtom(min(input)));
- }
- }
- static public class Final extends EvalFunc<DataAtom> {
- @Override
- public void exec(Tuple input, DataAtom output) throws IOException {
- output.setValue(min(input));
- }
- }
-
- static protected double min(Tuple input) throws IOException {
- DataBag values = input.getBagField(0);
-
- double curMin = Double.POSITIVE_INFINITY;
- for (Iterator it = values.content(); it.hasNext();) {
- Tuple t = (Tuple) it.next();
- try {
- curMin = java.lang.Math.min(curMin, t.getAtomField(0).numval());
- }catch(RuntimeException exp) {
- throw new IOException("Error processing: " + t.toString(), exp);
- }
- }
-
- return curMin;
-}
- @Override
- public Schema outputSchema(Schema input) {
- return new AtomSchema("min" + count++);
- }
+ @Override
+ public Double exec(Tuple input) throws IOException {
+ return min(input);
+ }
+
+ public String getInitial() {
+ return Initial.class.getName();
+ }
+
+ public String getIntermed() {
+ return Initial.class.getName();
+ }
+
+ public String getFinal() {
+ return Final.class.getName();
+ }
+
+ static public class Initial extends EvalFunc<Tuple> {
+ TupleFactory tfact = TupleFactory.getInstance();
+
+ @Override
+ public Tuple exec(Tuple input) throws IOException {
+ return tfact.newTuple(min(input));
+ }
+ }
+ static public class Final extends EvalFunc<Double> {
+ @Override
+ public Double exec(Tuple input) throws IOException {
+ return min(input);
+ }
+ }
+
+ static protected Double min(Tuple input) throws IOException {
+ DataBag values = (DataBag)input.get(0);
+
+ double curMin = Double.POSITIVE_INFINITY;
+ for (Iterator it = values.iterator(); it.hasNext();) {
+ Tuple t = (Tuple) it.next();
+ try {
+ curMin = java.lang.Math.min(curMin, (Double)t.get(0));
+ } catch (RuntimeException exp) {
+ IOException newE = new IOException("Error processing: " +
+ t.toString() + exp.getMessage());
+ newE.initCause(exp);
+ throw newE;
+ }
+ }
+
+ return new Double(curMin);
+ }
+
+ @Override
+ public Schema outputSchema(Schema input) {
+ return new AtomSchema("min" + count++);
+ }
- private static int count = 1;
+ private static int count = 1;
}
Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java Tue Jan 22 13:17:12 2008
@@ -18,18 +18,22 @@
package org.apache.pig.builtin;
import java.io.BufferedReader;
+import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.text.SimpleDateFormat;
+import java.util.ArrayList;
import java.util.Iterator;
import org.apache.pig.LoadFunc;
import org.apache.pig.StoreFunc;
-import org.apache.pig.data.TimestampedTuple;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.io.BufferedPositionedInputStream;
@@ -40,11 +44,13 @@
*/
public class PigStorage implements LoadFunc, StoreFunc {
protected BufferedPositionedInputStream in = null;
- private DataInputStream inData = null;
- long end = Long.MAX_VALUE;
- private String recordDel = "\n";
- private String fieldDel = "\t";
+ long end = Long.MAX_VALUE;
+ private byte recordDel = '\n';
+ private byte fieldDel = '\t';
+ private ByteArrayOutputStream mBuf;
+ private ArrayList<Object> mProtoTuple;
+ private TupleFactory mTupleFactory = TupleFactory.getInstance();
public PigStorage() {
}
@@ -53,28 +59,44 @@
* Constructs a Pig loader that uses specified regex as a field delimiter.
*
* @param delimiter
- * the regular expression that is used to separate fields. ("\t" is the default.) See
- * http://java.sun.com/j2se/1.5.0/docs/api/java/util/regex/Pattern.html for complete
- * explanation.
+ * the single byte character that is used to separate fields.
+ * ("\t" is the default.)
*/
public PigStorage(String delimiter) {
- this.fieldDel = delimiter;
+ this.fieldDel = (byte)delimiter.charAt(0);
+ mBuf = new ByteArrayOutputStream(4096);
+ mProtoTuple = new ArrayList<Object>();
}
- public Tuple getNext() throws IOException {
+ public Tuple getNext() throws IOException {
if (in == null || in.getPosition() > end) {
return null;
}
- String line;
- if((line = inData.readLine()) != null) {
- return new Tuple(line, fieldDel);
+
+ mBuf.reset();
+ while (true) {
+ // Hadoop's FSDataInputStream (which my input stream is based
+ // on at some point) is buffered, so I don't need to buffer.
+ int b = in.read();
+
+ if (b == fieldDel) {
+ readField();
+ } else if (b == recordDel) {
+ readField();
+ Tuple t = mTupleFactory.newTuple(mProtoTuple);
+ mProtoTuple.clear();
+ return t;
+ } else if (b == -1) {
+ // hit end of file
+ return null;
+ } else {
+ mBuf.write(b);
+ }
}
- return null;
}
- public void bindTo(String fileName, BufferedPositionedInputStream in, long offset, long end) throws IOException {
+ public void bindTo(String fileName, BufferedPositionedInputStream in, long offset, long end) throws IOException {
this.in = in;
- inData = new DataInputStream(in);
this.end = end;
// Since we are not block aligned we throw away the first
@@ -84,16 +106,87 @@
}
}
- OutputStream os;
+ OutputStream mOut;
public void bindTo(OutputStream os) throws IOException {
- this.os = os;
+ mOut = os;
}
public void putNext(Tuple f) throws IOException {
- os.write((f.toDelimitedString(this.fieldDel) + this.recordDel).getBytes());
+ // I have to convert integer fields to string, and then to bytes.
+ // If I use a DataOutputStream to convert directly from integer to
+ // bytes, I don't get a string representation.
+ int sz = f.size();
+ for (int i = 0; i < sz; i++) {
+ Object field = f.get(i);
+ switch (DataType.findType(field)) {
+ case DataType.NULL:
+ break; // just leave it empty
+
+ case DataType.BOOLEAN:
+ mOut.write(((Boolean)field).toString().getBytes());
+ break;
+
+ case DataType.INTEGER:
+ mOut.write(((Integer)field).toString().getBytes());
+ break;
+
+ case DataType.LONG:
+ mOut.write(((Long)field).toString().getBytes());
+ break;
+
+ case DataType.FLOAT:
+ mOut.write(((Float)field).toString().getBytes());
+ break;
+
+ case DataType.DOUBLE:
+ mOut.write(((Double)field).toString().getBytes());
+ break;
+
+ case DataType.BYTEARRAY: {
+ byte[] b = ((DataByteArray)field).get();
+ mOut.write(b, 0, b.length);
+ break;
+ }
+
+ case DataType.CHARARRAY:
+ // oddly enough, writeBytes writes a string
+ mOut.write(((String)field).getBytes());
+ break;
+
+ case DataType.MAP:
+ case DataType.TUPLE:
+ case DataType.BAG:
+ throw new IOException("Cannot store a non-flat tuple " +
+ "using PigStorage");
+
+ default:
+ throw new RuntimeException("Unknown datatype " +
+ DataType.findType(field));
+ }
+
+ if (i == sz - 1) {
+ // last field in tuple.
+ mOut.write(recordDel);
+ } else {
+ mOut.write(fieldDel);
+ }
+ }
}
public void finish() throws IOException {
+ }
+
+ private void readField() {
+ if (mBuf.size() == 0) {
+ // NULL value
+ mProtoTuple.add(null);
+ } else {
+ // TODO, once this can take schemas, we need to figure out
+ // if the user requested this to be viewed as a certain
+ // type, and if so, then construct it appropriately.
+ mProtoTuple.add(new DataByteArray(mBuf.toByteArray()));
+ }
+ mBuf.reset();
}
}
Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/SUM.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/SUM.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/SUM.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/SUM.java Tue Jan 22 13:17:12 2008
@@ -22,9 +22,9 @@
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
-import org.apache.pig.data.DataAtom;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.schema.AtomSchema;
import org.apache.pig.impl.logicalLayer.schema.Schema;
@@ -32,11 +32,11 @@
/**
* Generates the sum of the values of the first field of a tuple.
*/
-public class SUM extends EvalFunc<DataAtom> implements Algebraic {
+public class SUM extends EvalFunc<Double> implements Algebraic {
@Override
- public void exec(Tuple input, DataAtom output) throws IOException {
- output.setValue(sum(input));
+ public Double exec(Tuple input) throws IOException {
+ return sum(input);
}
public String getInitial() {
@@ -52,40 +52,43 @@
}
static public class Initial extends EvalFunc<Tuple> {
+ TupleFactory tfact = TupleFactory.getInstance();
+
@Override
- public void exec(Tuple input, Tuple output) throws IOException {
- output.appendField(new DataAtom(sum(input)));
+ public Tuple exec(Tuple input) throws IOException {
+ return tfact.newTuple(sum(input));
}
}
- static public class Final extends EvalFunc<DataAtom> {
+ static public class Final extends EvalFunc<Double> {
@Override
- public void exec(Tuple input, DataAtom output) throws IOException {
- output.setValue(sum(input));
+ public Double exec(Tuple input) throws IOException {
+ return sum(input);
}
}
static protected double sum(Tuple input) throws IOException {
- DataBag values = input.getBagField(0);
+ DataBag values = (DataBag)input.get(0);
double sum = 0;
- int i = 0;
+ int i = 0;
Tuple t = null;
- for (Iterator it = values.content(); it.hasNext();) {
+ for (Iterator it = values.iterator(); it.hasNext();) {
try {
t = (Tuple) it.next();
- i++;
- sum += t.getAtomField(0).numval();
+ i++;
+ sum += (Double)t.get(0);
}catch(RuntimeException exp) {
- String msg = "iteration = " + i + "bag size = " + values.cardinality() + " partial sum = " + sum + "\n";
- if (t != null)
- msg += "previous tupple = " + t.toString();
- throw new RuntimeException(exp.getMessage() + " additional info: " + msg);
- //throw new RuntimeException(exp.getMessage() + " error processing: " + t.toString());
+ String msg = "iteration = " + i + "bag size = " +
+ values.size() + " partial sum = " + sum + "\n";
+ if (t != null)
+ msg += "previous tupple = " + t.toString();
+ throw new RuntimeException(exp.getMessage() + " additional info: " + msg);
}
}
return sum;
}
+
@Override
public Schema outputSchema(Schema input) {
return new AtomSchema("sum" + count++);
Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/TOKENIZE.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/TOKENIZE.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/TOKENIZE.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/TOKENIZE.java Tue Jan 22 13:17:12 2008
@@ -21,22 +21,28 @@
import java.util.StringTokenizer;
import org.apache.pig.EvalFunc;
+import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.schema.AtomSchema;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.logicalLayer.schema.TupleSchema;
public class TOKENIZE extends EvalFunc<DataBag> {
+ TupleFactory mTupleFactory = TupleFactory.getInstance();
+ BagFactory mBagFactory = BagFactory.getInstance();
@Override
- public void exec(Tuple input, DataBag output) throws IOException {
- String str = input.getAtomField(0).strval();
+ public DataBag exec(Tuple input) throws IOException {
+ DataBag output = mBagFactory.newDefaultBag();
+ String str = (String)input.get(0);
StringTokenizer tok = new StringTokenizer(str, " \",()*", false);
while (tok.hasMoreTokens()) {
- output.add(new Tuple(tok.nextToken()));
+ output.add(mTupleFactory.newTuple(tok.nextToken()));
}
+ return output;
}
@Override
Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/TextLoader.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/TextLoader.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/TextLoader.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/TextLoader.java Tue Jan 22 13:17:12 2008
@@ -17,12 +17,13 @@
*/
package org.apache.pig.builtin;
-import java.io.DataInputStream;
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
import java.io.IOException;
import org.apache.pig.LoadFunc;
-import org.apache.pig.data.DataAtom;
import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.io.BufferedPositionedInputStream;
@@ -32,13 +33,13 @@
*/
public class TextLoader implements LoadFunc{
BufferedPositionedInputStream in;
- private DataInputStream inData = null;
-
+ private BufferedReader inData = null;
long end;
+ private TupleFactory mTupleFactory = TupleFactory.getInstance();
public void bindTo(String fileName, BufferedPositionedInputStream in, long offset, long end) throws IOException {
this.in = in;
- inData = new DataInputStream(in);
+ inData = new BufferedReader(new InputStreamReader(in, "UTF8"));
this.end = end;
// Since we are not block aligned we throw away the first
// record and cound on a different instance to read it
@@ -51,9 +52,7 @@
return null;
String line;
if ((line = inData.readLine()) != null) {
- Tuple t = new Tuple(1);
- t.setField(0, new DataAtom(line));
- return t;
+ return mTupleFactory.newTuple(new String(line));
}
return null;
}
Modified: incubator/pig/branches/types/src/org/apache/pig/data/AmendableTuple.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/AmendableTuple.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/AmendableTuple.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/AmendableTuple.java Tue Jan 22 13:17:12 2008
@@ -17,18 +17,18 @@
*/
package org.apache.pig.data;
-public class AmendableTuple extends Tuple {
- Datum amendKey; // the identifier of the group to which this tuple belongs.
+public class AmendableTuple extends DefaultTuple {
+ Object amendKey; // the identifier of the group to which this tuple belongs.
- public AmendableTuple(int numFields, Datum amendKey) {
+ public AmendableTuple(int numFields, Object amendKey) {
super(numFields);
this.amendKey = amendKey;
}
- public Datum getAmendKey() {
+ public Object getAmendKey() {
return amendKey;
}
- public void setAmendKey(Datum amendKey) {
+ public void setAmendKey(Object amendKey) {
this.amendKey = amendKey;
}
Modified: incubator/pig/branches/types/src/org/apache/pig/data/BagFactory.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/BagFactory.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/BagFactory.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/BagFactory.java Tue Jan 22 13:17:12 2008
@@ -17,40 +17,99 @@
*/
package org.apache.pig.data;
-import java.io.File;
-import java.io.IOException;
-
-public class BagFactory {
-
- private File tmpdir = null;
- private static BagFactory instance = new BagFactory();
-
+import java.lang.Class;
+import java.lang.ClassLoader;
+import java.net.URL;
+import java.net.URLClassLoader;
+
+import org.apache.pig.impl.eval.EvalSpec;
+import org.apache.pig.impl.util.SpillableMemoryManager;
+
+/**
+ * A bag factory. Can be used to generate different types of bags
+ * depending on what is needed. This class is abstract so that users can
+ * override the bag factory if they desire to provide their own that
+ * returns their implementation of a bag. If the property
+ * pig.data.bag.factory.name is set to a class name and
+ * pig.data.bag.factory.jar is set to a URL pointing to a jar that
+ * contains the above named class, then getInstance() will create a
+ * a instance of the named class using the indicatd jar. Otherwise, it
+ * will create and instance of DefaultBagFactory.
+ */
+public abstract class BagFactory {
+ private static BagFactory gSelf = null;
+ private static SpillableMemoryManager gMemMgr;
+
+ /**
+ * Get a reference to the singleton factory.
+ */
public static BagFactory getInstance() {
- return instance;
+ if (gSelf == null) {
+ String factoryName =
+ System.getProperty("pig.data.bag.factory.name");
+ String factoryJar =
+ System.getProperty("pig.data.bag.factory.jar");
+ if (factoryName != null && factoryJar != null) {
+ try {
+ URL[] urls = new URL[1];
+ urls[0] = new URL(factoryJar);
+ ClassLoader loader = new URLClassLoader(urls,
+ BagFactory.class.getClassLoader());
+ Class c = Class.forName(factoryName, true, loader);
+ Object o = c.newInstance();
+ if (!(o instanceof BagFactory)) {
+ throw new RuntimeException("Provided factory " +
+ factoryName + " does not extend BagFactory!");
+ }
+ gSelf = (BagFactory)o;
+ } catch (Exception e) {
+ if (e instanceof RuntimeException) {
+ // We just threw this
+ RuntimeException re = (RuntimeException)e;
+ throw re;
+ }
+ throw new RuntimeException("Unable to instantiate "
+ + "bag factory " + factoryName, e);
+ }
+ } else {
+ gSelf = new DefaultBagFactory();
+ }
+ }
+ return gSelf;
}
+
+ /**
+ * Get a default (unordered, not distinct) data bag.
+ */
+ public abstract DataBag newDefaultBag();
+
+ /**
+ * Get a sorted data bag.
+ * @param spec EvalSpec that controls how the data is sorted.
+ * If null, default comparator will be used.
+ */
+ public abstract DataBag newSortedBag(EvalSpec spec);
+
+ /**
+ * Get a distinct data bag.
+ */
+ public abstract DataBag newDistinctBag();
- private BagFactory() {
+ protected BagFactory() {
+ gMemMgr = new SpillableMemoryManager();
}
- public static void init(File tmpdir) {
- instance.setTmpDir(tmpdir);
+ protected void registerBag(DataBag b) {
+ gMemMgr.registerSpillable(b);
}
- private void setTmpDir(File tmpdir) {
- this.tmpdir = tmpdir;
- this.tmpdir.mkdirs();
- }
-
- // Get BigBag or Bag, depending on whether the temp directory has been set up
- public DataBag getNewBag(Datum.DataType type) throws IOException {
- if (tmpdir == null) return new DataBag(type);
- else return getNewBigBag(type);
- }
-
- // Need a Big Bag, dammit!
- public BigDataBag getNewBigBag(Datum.DataType type) throws IOException {
- if (tmpdir == null) throw new IOException("No temp directory given for BigDataBag.");
- else return new BigDataBag(type, tmpdir);
+ /**
+ * Provided for testing purposes only. This function should never be
+ * called by anybody but the unit tests.
+ */
+ public static void resetSelf() {
+ gSelf = null;
}
}
+