You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/01/22 22:17:22 UTC
svn commit: r614325 [5/6] - in /incubator/pig/branches/types: ./ lib/
scripts/ src/org/apache/pig/ src/org/apache/pig/builtin/
src/org/apache/pig/data/ src/org/apache/pig/impl/
src/org/apache/pig/impl/builtin/ src/org/apache/pig/impl/eval/
src/org/apac...
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POCogroup.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POCogroup.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POCogroup.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POCogroup.java Tue Jan 22 13:17:12 2008
@@ -26,8 +26,9 @@
import org.apache.pig.data.AmendableTuple;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
-import org.apache.pig.data.Datum;
+import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.eval.EvalSpec;
import org.apache.pig.impl.eval.collector.DataCollector;
import org.apache.pig.impl.logicalLayer.LOCogroup;
@@ -40,7 +41,7 @@
*
*/
private static final long serialVersionUID = 1L;
- List<Datum[]>[] sortedInputs;
+ List<Object[]>[] sortedInputs;
List<EvalSpec> specs;
public POCogroup(List<EvalSpec> specs, int outputType) {
@@ -61,11 +62,11 @@
for (int i = 0; i < inputs.length; i++) {
final int finalI = i;
- sortedInputs[i] = new ArrayList<Datum[]>();
+ sortedInputs[i] = new ArrayList<Object[]>();
DataCollector outputFromSpec = new DataCollector(null){
@Override
- public void add(Datum d) {
+ public void add(Object d) {
sortedInputs[finalI].add(LOCogroup.getGroupAndTuple(d));
}
};
@@ -78,9 +79,9 @@
}
inputToSpec.finishPipe();
- Collections.sort(sortedInputs[i], new Comparator<Datum[]>() {
- public int compare(Datum[] a, Datum[] b) {
- return a[0].compareTo(b[0]);
+ Collections.sort(sortedInputs[i], new Comparator<Object[]>() {
+ public int compare(Object[] a, Object[] b) {
+ return DataType.compare(a[0], b[0]);
}
});
}
@@ -95,11 +96,11 @@
// find the smallest group among all inputs (this is the group we should make a tuple
// out of)
- Datum smallestGroup = null;
+ Object smallestGroup = null;
for (int i = 0; i < inputs.length; i++) {
if (sortedInputs[i].size() > 0) {
- Datum g = (sortedInputs[i].get(0))[0];
- if (smallestGroup == null || g.compareTo(smallestGroup)<0)
+ Object g = (sortedInputs[i].get(0))[0];
+ if (smallestGroup == null || DataType.compare(g, smallestGroup)<0)
smallestGroup = g;
}
}
@@ -112,26 +113,26 @@
Tuple output;
if (outputType == LogicalOperator.AMENDABLE) output = new AmendableTuple(1 + inputs.length, smallestGroup);
- else output = new Tuple(1 + inputs.length);
+ else output = TupleFactory.getInstance().newTuple(1 + inputs.length);
// set first field to the group tuple
- output.setField(0, smallestGroup);
+ output.set(0, smallestGroup);
if (lineageTracer != null) lineageTracer.insert(output);
boolean done = true;
for (int i = 0; i < inputs.length; i++) {
- DataBag b =
- BagFactory.getInstance().getNewBag(Datum.DataType.TUPLE);
+ DataBag b = BagFactory.getInstance().newDefaultBag();
while (sortedInputs[i].size() > 0) {
- Datum g = sortedInputs[i].get(0)[0];
+ Object g = sortedInputs[i].get(0)[0];
Tuple t = (Tuple) sortedInputs[i].get(0)[1];
- if (g.compareTo(smallestGroup) < 0) {
+ int c = DataType.compare(g, smallestGroup);
+ if (c < 0) {
sortedInputs[i].remove(0); // discard this tuple
- } else if (g.equals(smallestGroup)) {
+ } else if (c == 0) {
b.add(t);
if (lineageTracer != null) lineageTracer.union(t, output); // update lineage
sortedInputs[i].remove(0);
@@ -140,17 +141,21 @@
}
}
- if (specs.get(i).isInner() && b.isEmpty())
+ if (specs.get(i).isInner() && (b.size() == 0))
done = false; // this input uses "inner" semantics, and it has no tuples for
// this group, so suppress the tuple we're currently building
- output.setField(1 + i, b);
+ output.set(1 + i, b);
}
if (done)
return output;
}
+ }
+
+ public void visit(POVisitor v) {
+ v.visitCogroup(this);
}
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POEval.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POEval.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POEval.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POEval.java Tue Jan 22 13:17:12 2008
@@ -97,4 +97,8 @@
}
}
+ public void visit(POVisitor v) {
+ v.visitEval(this);
+ }
+
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POLoad.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POLoad.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POLoad.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POLoad.java Tue Jan 22 13:17:12 2008
@@ -65,4 +65,9 @@
return lf.getNext();
}
+ @Override
+ public void visit(POVisitor v) {
+ v.visitLoad(this);
+ }
+
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POMapreduce.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POMapreduce.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POMapreduce.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POMapreduce.java Tue Jan 22 13:17:12 2008
@@ -19,7 +19,12 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Comparator;
+import org.apache.log4j.Logger;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.eval.EvalSpec;
@@ -27,25 +32,28 @@
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.mapreduceExec.MapReduceLauncher;
import org.apache.pig.impl.util.ObjectSerializer;
-
+import org.apache.pig.impl.util.PigLogger;
public class POMapreduce extends PhysicalOperator {
private static final long serialVersionUID = 1L;
public ArrayList<EvalSpec> toMap = new ArrayList<EvalSpec>();
- public ArrayList<EvalSpec> toCombine = null;
+ //public ArrayList<EvalSpec> toCombine = null;
+ public EvalSpec toCombine = null;
public EvalSpec toReduce = null;
public ArrayList<EvalSpec> groupFuncs = null;
public SplitSpec toSplit = null;
public ArrayList<FileSpec> inputFileSpecs = new ArrayList<FileSpec>();
public FileSpec outputFileSpec = null;
public Class partitionFunction = null;
+ public Class<WritableComparator> userComparator = null;
public String quantilesFile = null;
public PigContext pigContext;
public int mapParallelism = -1; // -1 means let hadoop decide
public int reduceParallelism = -1;
+
static MapReduceLauncher mapReduceLauncher = new MapReduceLauncher();
@@ -156,16 +164,16 @@
}
void print() {
- System.out.println("\n----- MapReduce Job -----");
- System.out.println("Input: " + inputFileSpecs);
- System.out.println("Map: " + toMap);
- System.out.println("Group: " + groupFuncs);
- System.out.println("Combine: " + toCombine);
- System.out.println("Reduce: " + toReduce);
- System.out.println("Output: " + outputFileSpec);
- System.out.println("Split: " + toSplit);
- System.out.println("Map parallelism: " + mapParallelism);
- System.out.println("Reduce parallelism: " + reduceParallelism);
+ Logger log = PigLogger.getLogger();
+ log.debug("Input: " + inputFileSpecs);
+ log.debug("Map: " + toMap);
+ log.debug("Group: " + groupFuncs);
+ log.debug("Combine: " + toCombine);
+ log.debug("Reduce: " + toReduce);
+ log.debug("Output: " + outputFileSpec);
+ log.debug("Split: " + toSplit);
+ log.debug("Map parallelism: " + mapParallelism);
+ log.debug("Reduce parallelism: " + reduceParallelism);
}
public POMapreduce copy(){
@@ -199,6 +207,10 @@
toReduce = spec;
else
toReduce = toReduce.addSpec(spec);
+ }
+
+ public void visit(POVisitor v) {
+ v.visitMapreduce(this);
}
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PORead.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PORead.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PORead.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PORead.java Tue Jan 22 13:17:12 2008
@@ -22,7 +22,6 @@
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
-import org.apache.pig.data.Datum;
public class PORead extends PhysicalOperator {
@@ -31,7 +30,7 @@
*/
private static final long serialVersionUID = 1L;
DataBag bag;
- Iterator<Datum> it;
+ Iterator<Tuple> it;
public PORead(DataBag bagIn, int outputType) {
super(outputType);
@@ -47,7 +46,7 @@
if (continueFromLast){
throw new RuntimeException("LOReads should not occur in continuous plans");
}
- it = bag.content();
+ it = bag.iterator();
return true;
}
@@ -55,9 +54,13 @@
@Override
public Tuple getNext() throws IOException {
if (it.hasNext())
- return (Tuple)it.next();
+ return it.next();
else
return null;
+ }
+
+ public void visit(POVisitor v) {
+ v.visitRead(this);
}
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POSort.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POSort.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POSort.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POSort.java Tue Jan 22 13:17:12 2008
@@ -24,14 +24,13 @@
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
-import org.apache.pig.data.Datum;
import org.apache.pig.impl.eval.EvalSpec;
public class POSort extends PhysicalOperator {
static final long serialVersionUID = 1L;
EvalSpec sortSpec;
- transient Iterator<Datum> iter;
+ transient Iterator<Tuple> iter;
public POSort(EvalSpec sortSpec, int outputType) {
@@ -44,24 +43,27 @@
public boolean open(boolean continueFromLast) throws IOException {
if (!super.open(continueFromLast))
return false;
- DataBag bag =
- BagFactory.getInstance().getNewBag(Datum.DataType.TUPLE);
+ DataBag bag = BagFactory.getInstance().newSortedBag(sortSpec);
- bag.sort(sortSpec);
Tuple t;
while((t = inputs[0].getNext())!=null){
bag.add(t);
}
- iter = bag.content();
+ iter = bag.iterator();
return true;
}
@Override
public Tuple getNext() throws IOException {
if (iter.hasNext())
- return (Tuple)iter.next();
+ return iter.next();
else
return null;
}
+
+ @Override
+ public void visit(POVisitor v) {
+ v.visitSort(this);
+ }
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POSplitMaster.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POSplitMaster.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POSplitMaster.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POSplitMaster.java Tue Jan 22 13:17:12 2008
@@ -109,4 +109,9 @@
}
}
*/
+
+ public void visit(POVisitor v) {
+ v.visitSplitMaster(this);
+ }
+
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POSplitSlave.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POSplitSlave.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POSplitSlave.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POSplitSlave.java Tue Jan 22 13:17:12 2008
@@ -42,4 +42,8 @@
return master.slaveGetNext(this);
}
+ public void visit(POVisitor v) {
+ v.visitSplit(this);
+ }
+
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POStore.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POStore.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POStore.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POStore.java Tue Jan 22 13:17:12 2008
@@ -21,8 +21,8 @@
import org.apache.pig.StoreFunc;
import org.apache.pig.data.DataBag;
+import org.apache.pig.data.BagFactory;
import org.apache.pig.data.Tuple;
-import org.apache.pig.data.Datum;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.io.PigFile;
@@ -63,7 +63,7 @@
@Override
public Tuple getNext() throws IOException {
// get all tuples from input, and store them.
- DataBag b = new DataBag(Datum.DataType.TUPLE);
+ DataBag b = BagFactory.getInstance().newDefaultBag();
Tuple t;
while ((t = (Tuple) inputs[0].getNext()) != null) {
b.add(t);
@@ -88,6 +88,11 @@
new RuntimeException().printStackTrace();
System.exit(1);
return -1;
+ }
+
+ @Override
+ public void visit(POVisitor v) {
+ v.visitStore(this);
}
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POUnion.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POUnion.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POUnion.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POUnion.java Tue Jan 22 13:17:12 2008
@@ -72,4 +72,8 @@
return null;
}
+ public void visit(POVisitor v) {
+ v.visitUnion(this);
+ }
+
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PhysicalOperator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PhysicalOperator.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PhysicalOperator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PhysicalOperator.java Tue Jan 22 13:17:12 2008
@@ -71,4 +71,6 @@
public int getOutputType(){
return outputType;
}
+
+ public abstract void visit(POVisitor v);
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PhysicalPlan.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PhysicalPlan.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PhysicalPlan.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PhysicalPlan.java Tue Jan 22 13:17:12 2008
@@ -21,7 +21,7 @@
import java.util.Map;
import org.apache.pig.data.DataBag;
-import org.apache.pig.data.Datum;
+import org.apache.pig.data.BagFactory;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
@@ -34,7 +34,7 @@
}
public DataBag exec(boolean continueFromLast) throws IOException {
- DataBag results = new DataBag(Datum.DataType.TUPLE);
+ DataBag results = BagFactory.getInstance().newDefaultBag();
root.open(continueFromLast);
Tuple t;
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/util/DataBuffer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/util/DataBuffer.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/util/DataBuffer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/util/DataBuffer.java Tue Jan 22 13:17:12 2008
@@ -19,7 +19,6 @@
import java.util.*;
-import org.apache.pig.data.Datum;
import org.apache.pig.impl.eval.collector.DataCollector;
@@ -30,14 +29,14 @@
super(null);
}
- List<Datum> buf = Collections.synchronizedList(new LinkedList<Datum>());
+ List<Object> buf = Collections.synchronizedList(new LinkedList<Object>());
@Override
- public void add(Datum d){
+ public void add(Object d){
if (d != null) buf.add(d);
}
- public Datum removeFirst(){
+ public Object removeFirst(){
if (buf.isEmpty())
return null;
else
@@ -48,8 +47,8 @@
* This is a sequence we want to do frequently to accomodate the simple eval case, i.e., cases
* where we know that running an eval spec one item should produce one and only one item.
*/
- public Datum removeFirstAndAssertEmpty(){
- Datum d;
+ public Object removeFirstAndAssertEmpty(){
+ Object d;
if (isStale() || (d = removeFirst()) == null){
throw new RuntimeException("Simple eval used but buffer found to be empty or stale");
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/util/JarManager.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/util/JarManager.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/util/JarManager.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/util/JarManager.java Tue Jan 22 13:17:12 2008
@@ -168,17 +168,29 @@
*/
private static void mergeJar(JarOutputStream jarFile, String jar, String prefix, Map<String, String> contents)
throws FileNotFoundException, IOException {
- JarInputStream jis = new JarInputStream(new FileInputStream(jar));
+ JarInputStream jarInput = new JarInputStream(new FileInputStream(jar));
+
+ mergeJar(jarFile, jarInput, prefix, contents);
+ }
+
+ private static void mergeJar(JarOutputStream jarFile, URL jar, String prefix, Map<String, String> contents)
+ throws FileNotFoundException, IOException {
+ JarInputStream jarInput = new JarInputStream(jar.openStream());
+
+ mergeJar(jarFile, jarInput, prefix, contents);
+ }
+
+ private static void mergeJar(JarOutputStream jarFile, JarInputStream jarInput, String prefix, Map<String, String> contents)
+ throws FileNotFoundException, IOException {
JarEntry entry;
- while ((entry = jis.getNextJarEntry()) != null) {
+ while ((entry = jarInput.getNextJarEntry()) != null) {
if (prefix != null && !entry.getName().startsWith(prefix)) {
continue;
}
- addStream(jarFile, entry.getName(), jis, contents);
+ addStream(jarFile, entry.getName(), jarInput, contents);
}
}
-
- /**
+ /**
* Adds a stream to a Jar file.
*
* @param os
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/util/PigLogger.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/util/PigLogger.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/util/PigLogger.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/util/PigLogger.java Tue Jan 22 13:17:12 2008
@@ -36,7 +36,6 @@
{
if (mLogger == null) {
mLogger = Logger.getLogger("org.apache.pig");
- mLogger.setAdditivity(false);
}
return mLogger;
}
Propchange: incubator/pig/branches/types/src/org/apache/pig/tools/grunt/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Tue Jan 22 13:17:12 2008
@@ -0,0 +1,7 @@
+
+TokenMgrError.java
+Token.java
+SimpleCharStream.java
+ParseException.java
+GruntParserTokenManager.java
+GruntParserConstants.java
Propchange: incubator/pig/branches/types/src/org/apache/pig/tools/pigscript/parser/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Tue Jan 22 13:17:12 2008
@@ -0,0 +1,8 @@
+
+TokenMgrError.java
+Token.java
+SimpleCharStream.java
+PigScriptParserTokenManager.java
+PigScriptParserConstants.java
+PigScriptParser.java
+ParseException.java
Propchange: incubator/pig/branches/types/test/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Tue Jan 22 13:17:12 2008
@@ -0,0 +1,2 @@
+
+reports
Added: incubator/pig/branches/types/test/org/apache/pig/test/NonDefaultBagFactory.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/NonDefaultBagFactory.java?rev=614325&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/NonDefaultBagFactory.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/NonDefaultBagFactory.java Tue Jan 22 13:17:12 2008
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import org.apache.pig.data.*;
+import org.apache.pig.impl.eval.EvalSpec;
+
+// Test data bag factory, for testing that we can propery provide a non
+// default bag factory.
+public class NonDefaultBagFactory extends BagFactory {
+ public DataBag newDefaultBag() { return null; }
+ public DataBag newSortedBag(EvalSpec sortSpec) { return null; }
+ public DataBag newDistinctBag() { return null; }
+
+ public NonDefaultBagFactory() { super(); }
+}
+
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestAlgebraicEval.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestAlgebraicEval.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestAlgebraicEval.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestAlgebraicEval.java Tue Jan 22 13:17:12 2008
@@ -33,6 +33,36 @@
public class TestAlgebraicEval extends TestCase {
private String initString = "mapreduce";
+
+ @Test
+ public void testGroupCountWithMultipleFields() throws Exception {
+ int LOOP_COUNT = 1024;
+ PigServer pig = new PigServer(initString);
+ File tmpFile = File.createTempFile("test", "txt");
+ PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+ for(int i = 0; i < LOOP_COUNT; i++) {
+ for(int j=0; j< LOOP_COUNT; j++) {
+ ps.println(i + "\t" + i + "\t" + j%2);
+ }
+ }
+ ps.close();
+ String query = "myid = foreach (group (load 'file:" + tmpFile + "') all) generate group, COUNT($1) ;";
+ System.out.println(query);
+ pig.registerQuery(" a = group (load 'file:" + tmpFile + "') by ($0,$1);");
+ pig.registerQuery("b = foreach a generate flatten(group), SUM($1.$2);");
+ Iterator<Tuple> it = pig.openIterator("b");
+ tmpFile.delete();
+ int count = 0;
+ while(it.hasNext()){
+ int sum = it.next().getAtomField(2).numval().intValue();
+ assertEquals(LOOP_COUNT/2, sum);
+ count++;
+ }
+ assertEquals(count, LOOP_COUNT);
+ }
+
+
+
@Test
public void testSimpleCount() throws Exception {
int LOOP_COUNT = 1024;
@@ -72,6 +102,8 @@
Double count = t.getAtomField(1).numval();
assertEquals(count, (double)LOOP_COUNT);
}
+
+
@Test
public void testGroupReorderCount() throws Exception {
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestBuiltin.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestBuiltin.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestBuiltin.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestBuiltin.java Tue Jan 22 13:17:12 2008
@@ -62,6 +62,48 @@
}
@Test
+ public void testAVGInitial() throws Exception {
+ int input[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
+
+ EvalFunc<Tuple> avg = new AVG.Initial();
+ Tuple tup = Util.loadNestTuple(new Tuple(1), input);
+ Tuple output = new Tuple();
+ avg.exec(tup, output);
+
+ assertEquals("Expected sum to be 55.0", 55.0,
+ output.getAtomField(0).numval());
+ assertEquals("Expected count to be 10", 10,
+ output.getAtomField(1).longVal());
+ }
+
+ @Test
+ public void testAVGFinal() throws Exception {
+ Tuple t1 = new Tuple(2);
+ t1.setField(0, 55.0);
+ t1.setField(1, 10);
+ Tuple t2 = new Tuple(2);
+ t2.setField(0, 28.0);
+ t2.setField(1, 7);
+ Tuple t3 = new Tuple(2);
+ t3.setField(0, 82.0);
+ t3.setField(1, 17);
+ DataBag bag = BagFactory.getInstance().newDefaultBag();
+ bag.add(t1);
+ bag.add(t2);
+ bag.add(t3);
+
+ Tuple tup = new Tuple(bag);
+
+ EvalFunc<DataAtom> avg = new AVG.Final();
+ DataAtom output = new DataAtom();
+ avg.exec(tup, output);
+
+ assertEquals("Expected average to be 4.852941176470588",
+ 4.852941176470588, output.numval());
+ }
+
+
+ @Test
public void testCOUNT() throws Exception {
int input[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
double expected = input.length;
@@ -91,7 +133,7 @@
count.exec(tup,output);
assertTrue(output.numval() == 0);
- map.put("a", "a");
+ map.put("a", new DataAtom("a"));
assertFalse(isEmpty.exec(tup));
count.exec(tup,output);
@@ -105,7 +147,32 @@
assertTrue(output.numval() == 2);
}
-
+
+ @Test
+ public void testCOUNTInitial() throws Exception {
+ int input[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
+
+ EvalFunc<Tuple> count = new COUNT.Initial();
+ Tuple tup = Util.loadNestTuple(new Tuple(1), input);
+ Tuple output = new Tuple();
+ count.exec(tup, output);
+
+ assertEquals("Expected count to be 10", 10,
+ output.getAtomField(0).longVal());
+ }
+
+ @Test
+ public void testCOUNTFinal() throws Exception {
+ int input[] = { 23, 38, 39 };
+ Tuple tup = Util.loadNestTuple(new Tuple(1), input);
+
+ EvalFunc<DataAtom> count = new COUNT.Final();
+ DataAtom output = new DataAtom();
+ count.exec(tup, output);
+
+ assertEquals("Expected count to be 100", 100, output.longVal());
+ }
+
@Test
public void testSUM() throws Exception {
int input[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
@@ -121,6 +188,108 @@
assertTrue(actual == expected);
}
+ @Test
+ public void testSUMInitial() throws Exception {
+ int input[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
+
+ EvalFunc<Tuple> sum = new SUM.Initial();
+ Tuple tup = Util.loadNestTuple(new Tuple(1), input);
+ Tuple output = new Tuple();
+ sum.exec(tup, output);
+
+ assertEquals("Expected sum to be 55.0", 55.0,
+ output.getAtomField(0).numval());
+ }
+
+ @Test
+ public void testSUMFinal() throws Exception {
+ int input[] = { 23, 38, 39 };
+ Tuple tup = Util.loadNestTuple(new Tuple(1), input);
+
+ EvalFunc<DataAtom> sum = new SUM.Final();
+ DataAtom output = new DataAtom();
+ sum.exec(tup, output);
+
+ assertEquals("Expected sum to be 100.0", 100.0, output.numval());
+ }
+
+ @Test
+ public void testMIN() throws Exception {
+ int input[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
+
+ EvalFunc<DataAtom> min = new MIN();
+ Tuple tup = Util.loadNestTuple(new Tuple(1), input);
+ DataAtom output = new DataAtom();
+ min.exec(tup, output);
+
+ assertEquals("Expected min to be 1.0", 1.0, output.numval());
+ }
+
+
+ @Test
+ public void testMINInitial() throws Exception {
+ int input[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
+
+ EvalFunc<Tuple> min = new MIN.Initial();
+ Tuple tup = Util.loadNestTuple(new Tuple(1), input);
+ Tuple output = new Tuple();
+ min.exec(tup, output);
+
+ assertEquals("Expected min to be 1.0", 1.0,
+ output.getAtomField(0).numval());
+ }
+
+ @Test
+ public void testMINFinal() throws Exception {
+ int input[] = { 23, 38, 39 };
+ Tuple tup = Util.loadNestTuple(new Tuple(1), input);
+
+ EvalFunc<DataAtom> min = new MIN.Final();
+ DataAtom output = new DataAtom();
+ min.exec(tup, output);
+
+ assertEquals("Expected sum to be 23.0", 23.0, output.numval());
+ }
+
+ @Test
+ public void testMAX() throws Exception {
+ int input[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
+
+ EvalFunc<DataAtom> max = new MAX();
+ Tuple tup = Util.loadNestTuple(new Tuple(1), input);
+ DataAtom output = new DataAtom();
+ max.exec(tup, output);
+
+ assertEquals("Expected max to be 10.0", 10.0, output.numval());
+ }
+
+
+ @Test
+ public void testMAXInitial() throws Exception {
+ int input[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
+
+ EvalFunc<Tuple> max = new MAX.Initial();
+ Tuple tup = Util.loadNestTuple(new Tuple(1), input);
+ Tuple output = new Tuple();
+ max.exec(tup, output);
+
+ assertEquals("Expected max to be 10.0", 10.0,
+ output.getAtomField(0).numval());
+ }
+
+ @Test
+ public void testMAXFinal() throws Exception {
+ int input[] = { 23, 38, 39 };
+ Tuple tup = Util.loadNestTuple(new Tuple(1), input);
+
+ EvalFunc<DataAtom> max = new MAX.Final();
+ DataAtom output = new DataAtom();
+ max.exec(tup, output);
+
+ assertEquals("Expected sum to be 39.0", 39.0, output.numval());
+ }
+
+
// Builtin APPLY Functions
// ========================
@@ -159,6 +328,7 @@
assertTrue(f3.arity() == arity3);
}
+ /*
@Test
public void testLFBin() throws Exception {
@@ -172,8 +342,7 @@
t2.setField(0,a);
Tuple t3 = new Tuple(1);
t3.setField(0, b);
- DataBag bag =
- BagFactory.getInstance().getNewBigBag(Datum.DataType.TUPLE);
+ DataBag bag = BagFactory.getInstance().getNewBigBag();
bag.add(t2);
bag.add(t3);
Tuple t4 = new Tuple(2);
@@ -192,8 +361,7 @@
t6.setField(0,c);
Tuple t7 = new Tuple(1);
t7.setField(0, d);
- DataBag bag2 =
- BagFactory.getInstance().getNewBigBag(Datum.DataType.TUPLE);
+ DataBag bag2 = BagFactory.getInstance().getNewBigBag();
for(int i = 0; i < 10; i ++) {
bag2.add(t6);
bag2.add(t7);
@@ -224,6 +392,7 @@
assertTrue(r1.equals(t1));
assertTrue(r2.equals(t5));
}
+ */
@Test
@@ -316,12 +485,8 @@
for (int i=0; i< numTimes; i++){
Tuple t = iter.next();
-
- Tuple t0 = (Tuple)t.getBagField(0).content().next();
- Tuple t1 = (Tuple)t.getBagField(1).content().next();
- assertEquals(i+"AA", t0.getAtomField(0).strval());
- assertEquals(i+"BB", t1.getAtomField(0).strval());
-
+ assertEquals(i+"AA", t.getBagField(0).iterator().next().getAtomField(0).strval());
+ assertEquals(i+"BB", t.getBagField(1).iterator().next().getAtomField(0).strval());
}
assertFalse(iter.hasNext());
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestDataBag.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestDataBag.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestDataBag.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestDataBag.java Tue Jan 22 13:17:12 2008
@@ -17,698 +17,728 @@
*/
package org.apache.pig.test;
-import java.io.DataInput;
-import java.io.DataOutput;
+/*
import java.io.DataInputStream;
import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FileInputStream;
import java.io.IOException;
-
-import java.util.List;
-import java.util.ArrayList;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
import java.util.Iterator;
+import java.util.Random;
+*/
+
+import java.util.*;
+import java.io.IOException;
import org.junit.Test;
import org.apache.pig.data.*;
+import org.apache.pig.impl.eval.*;
+import org.apache.pig.impl.util.Spillable;
/**
- * This class will exercise the data bag data type.
+ * This class will exercise the basic Pig data model and members. It tests for proper behavior in
+ * assigment and comparision, as well as function application.
*
- * @author gates
+ * @author dnm
*/
-public class TestDataBag extends junit.framework.TestCase
-{
-
-public void testDefaultConstructor() throws Exception
-{
- DataBag bag = new DataBag(Datum.DataType.INT);
-
- assertEquals("getType", Datum.DataType.BAG, bag.getType());
- assertFalse("is null", bag.isNull());
- assertTrue("bag of ints", bag.bagOf() == Datum.DataType.INT);
-
- assertEquals("Default constructor size before", 0, bag.size());
- DataInteger val = new DataInteger(42);
-
- bag.add(val);
- assertEquals("Default constructor size after", 1, bag.size());
-
- Iterator<Datum> i = bag.content();
- Datum d = i.next();
-
- assertTrue("should be an integer", d.getType() == Datum.DataType.INT);
- assertNotNull("get with entry in bag", d);
- assertEquals("value of val", 42, ((DataInteger)d).get());
-}
-
-public void testListConstructor() throws Exception
-{
- List<Datum> list = new ArrayList<Datum>();
- list.add(new DataInteger(10));
- list.add(new DataInteger(11));
- list.add(new DataInteger(9));
-
- DataBag bag = new DataBag(list);
-
- assertEquals("list construct size", 3L, bag.size());
-
- Iterator<Datum> i = bag.content();
- Datum d = i.next();
- assertNotNull("get first entry in bag", d);
- assertTrue("should be an integer", d.getType() == Datum.DataType.INT);
- assertEquals("first value of val", 10, ((DataInteger)d).get());
- d = i.next();
- assertNotNull("get second entry in bag", d);
- assertTrue("should be an integer", d.getType() == Datum.DataType.INT);
- assertEquals("second value of val", 11, ((DataInteger)d).get());
- d = i.next();
- assertNotNull("get third entry in bag", d);
- assertTrue("should be an integer", d.getType() == Datum.DataType.INT);
- assertEquals("third value of val", 9, ((DataInteger)d).get());
- assertFalse("bag should be exhausted now", i.hasNext());
-
- bag.add(new DataInteger(4));
- i = bag.content();
- d = i.next();
- d = i.next();
- d = i.next();
- d = i.next();
- assertNotNull("get fourth entry in bag", d);
- assertTrue("should be an integer", d.getType() == Datum.DataType.INT);
- assertEquals("fourth value of val", 4, ((DataInteger)d).get());
- assertFalse("bag should be exhausted now", i.hasNext());
-}
-
-
-public void testBigBag() throws Exception
-{
- DataBag bag = new DataBag(Datum.DataType.INT);
-
- for (int i = 0; i < 10000; i++) {
- bag.add(new DataInteger(i));
- }
-
- assertEquals("big size after loading", 10000, bag.size());
-
- Iterator<Datum> i = bag.content();
- for (int j = 0; j < 10000; j++) {
- assertTrue("should still have data", i.hasNext());
- Datum val = i.next();
- assertTrue("should be an integer", val.getType() == Datum.DataType.INT);
- assertEquals("value of val", j, ((DataInteger)val).get());
- }
- assertFalse("bag should be exhausted now", i.hasNext());
-}
-
-public void testToString() throws Exception
-{
- DataBag bag = new DataBag(Datum.DataType.INT);
-
- bag.add(new DataInteger(1));
- bag.add(new DataInteger(1));
- bag.add(new DataInteger(3));
-
- assertEquals("toString", "{1, 1, 3}", bag.toString());
-}
-
-public void testEquals() throws Exception
-{
- DataBag bag1 = new DataBag(Datum.DataType.INT);
- DataBag bag2 = new DataBag(Datum.DataType.INT);
-
- bag1.add(new DataInteger(3));
- bag2.add(new DataInteger(3));
-
- assertFalse("different object", bag1.equals(new String()));
-
- assertTrue("same data", bag1.equals(bag2));
-
- bag2 = new DataBag(Datum.DataType.INT);
- bag2.add(new DataInteger(4));
- assertFalse("different data", bag1.equals(bag2));
-
- bag2 = new DataBag(Datum.DataType.INT);
- bag2.add(new DataInteger(3));
- bag2.add(new DataInteger(3));
- assertFalse("different size", bag1.equals(bag2));
-
- bag2 = new DataBag(Datum.DataType.LONG);
- bag2.add(new DataLong(3));
- assertFalse("different type of bag", bag1.equals(bag2));
-}
-
-public void testCompareTo() throws Exception
-{
- DataBag bag1 = new DataBag(Datum.DataType.INT);
- DataBag bag2 = new DataBag(Datum.DataType.INT);
-
- bag1.add(new DataInteger(3));
- bag2.add(new DataInteger(3));
-
- assertEquals("different object less than", -1, bag1.compareTo(new String()));
-
- Tuple t = new Tuple();
- assertTrue("less than tuple", bag1.compareTo(t) < 0);
- DataMap map = new DataMap();
- assertTrue("less than map", bag1.compareTo(map) < 0);
- DataLong l = new DataLong();
- assertTrue("less than long", bag1.compareTo(l) < 0);
- DataFloat f = new DataFloat();
- assertTrue("less than float", bag1.compareTo(f) < 0);
- DataDouble d = new DataDouble();
- assertTrue("less than double", bag1.compareTo(d) < 0);
- DataUnknown unk = new DataUnknown();
- assertTrue("less than unknown", bag1.compareTo(unk) < 0);
- DataCharArrayUtf16 utf16 = new DataCharArrayUtf16();
- assertTrue("less than utf16", bag1.compareTo(utf16) < 0);
-
- assertEquals("same data equal", 0, bag1.compareTo(bag2));
-
- bag2 = new DataBag(Datum.DataType.INT);
- bag2.add(new DataInteger(2));
- assertEquals("greater than bag with lesser value", 1, bag1.compareTo(bag2));
-
- bag2 = new DataBag(Datum.DataType.INT);
- bag2.add(new DataInteger(4));
- assertEquals("less than bag with greater value", -1, bag1.compareTo(bag2));
-
- bag2 = new DataBag(Datum.DataType.INT);
- bag2.add(new DataInteger(3));
- bag2.add(new DataInteger(4));
- assertEquals("less than bigger bag", -1, bag1.compareTo(bag2));
-
- bag2 = new DataBag(Datum.DataType.INT);
- assertEquals("greater than smaller bag", 1, bag1.compareTo(bag2));
-
- bag2 = new DataBag(Datum.DataType.LONG);
- bag2.add(new DataLong(3));
- assertEquals("different type of bag", -1, bag1.compareTo(bag2));
-}
-
-
-public void testWriteReadUnknown() throws Exception
-{
- DataBag before = new DataBag(Datum.DataType.UNKNOWN);
-
- String s = new String("zzz");
- before.add(new DataUnknown(s.getBytes()));
- s = new String("yyy");
- before.add(new DataUnknown(s.getBytes()));
- s = new String("xxx");
- before.add(new DataUnknown(s.getBytes()));
-
- File file = null;
- file = File.createTempFile("DataBagUnknown", "put");
- FileOutputStream fos = new FileOutputStream(file);
- DataOutput out = new DataOutputStream(fos);
- before.write(out);
- fos.close();
-
- FileInputStream fis = new FileInputStream(file);
- DataInput in = new DataInputStream(fis);
- Datum a = DatumImpl.readDatum(in);
-
- assertTrue("isa DataBag", a instanceof DataBag);
-
- DataBag after = (DataBag)a;
-
- assertTrue("bag of unknowns", after.bagOf() == Datum.DataType.UNKNOWN);
- assertEquals("after read, size", 3, after.size());
-
- Iterator<Datum> j = after.content();
-
- Datum valAfter = j.next();
- assertTrue("should be an unknown",
- valAfter.getType() == Datum.DataType.UNKNOWN);
- for (int i = 0; i < 3; i++) {
- assertEquals("value of valAfter", (byte)0x7a,
- ((DataUnknown)valAfter).get()[i]);
- }
-
- valAfter = j.next();
- assertTrue("should be an unknown",
- valAfter.getType() == Datum.DataType.UNKNOWN);
- for (int i = 0; i < 3; i++) {
- assertEquals("value of valAfter", (byte)0x79,
- ((DataUnknown)valAfter).get()[i]);
- }
-
- valAfter = j.next();
- assertTrue("should be an unknown",
- valAfter.getType() == Datum.DataType.UNKNOWN);
- for (int i = 0; i < 3; i++) {
- assertEquals("value of valAfter", (byte)0x78,
- ((DataUnknown)valAfter).get()[i]);
- }
-
- assertFalse("should have read all values in bag", j.hasNext());
-
- file.delete();
-}
-
-public void testWriteReadInt() throws Exception
-{
- DataBag before = new DataBag(Datum.DataType.INT);
-
- before.add(new DataInteger(99));
- before.add(new DataInteger(-98));
- before.add(new DataInteger(97));
-
- File file = null;
- file = File.createTempFile("DataBagInteger", "put");
- FileOutputStream fos = new FileOutputStream(file);
- DataOutput out = new DataOutputStream(fos);
- before.write(out);
- fos.close();
-
- FileInputStream fis = new FileInputStream(file);
- DataInput in = new DataInputStream(fis);
- Datum a = DatumImpl.readDatum(in);
-
- assertTrue("isa DataBag", a instanceof DataBag);
-
- DataBag after = (DataBag)a;
-
- assertTrue("bag of ints", after.bagOf() == Datum.DataType.INT);
-
- assertEquals("after read, size", 3, after.size());
-
- Iterator<Datum> j = after.content();
-
- Datum val = j.next();
- assertTrue("should be an integer", val.getType() == Datum.DataType.INT);
- assertEquals("value of valAfter", 99, ((DataInteger)val).get());
-
- val = j.next();
- assertTrue("should be an integer", val.getType() == Datum.DataType.INT);
- assertEquals("value of valAfter2", -98, ((DataInteger)val).get());
-
- val = j.next();
- assertTrue("should be an integer", val.getType() == Datum.DataType.INT);
- assertEquals("value of valAfter", 97, ((DataInteger)val).get());
-
- assertFalse("should have read all values in bag", j.hasNext());
-
- file.delete();
-}
-
-public void testWriteReadLong() throws Exception
-{
- DataBag before = new DataBag(Datum.DataType.LONG);
-
- before.add(new DataLong(99000000000L));
- before.add(new DataLong(-98L));
- before.add(new DataLong(97L));
-
- File file = null;
- file = File.createTempFile("DataBagLong", "put");
- FileOutputStream fos = new FileOutputStream(file);
- DataOutput out = new DataOutputStream(fos);
- before.write(out);
- fos.close();
-
- FileInputStream fis = new FileInputStream(file);
- DataInput in = new DataInputStream(fis);
- Datum a = DatumImpl.readDatum(in);
-
- assertTrue("isa DataBag", a instanceof DataBag);
-
- DataBag after = (DataBag)a;
-
- assertTrue("bag of longs", after.bagOf() == Datum.DataType.LONG);
- assertEquals("after read, size", 3, after.size());
-
- Iterator<Datum> j = after.content();
-
- Datum val = j.next();
- assertTrue("should be a long", val.getType() == Datum.DataType.LONG);
- assertEquals("value of valAfter", 99000000000L, ((DataLong)val).get());
-
- val = j.next();
- assertTrue("should be a long", val.getType() == Datum.DataType.LONG);
- assertEquals("value of valAfter2", -98L, ((DataLong)val).get());
-
- val = j.next();
- assertTrue("should be a long", val.getType() == Datum.DataType.LONG);
- assertEquals("value of valAfter", 97L, ((DataLong)val).get());
-
- assertFalse("should have read all values in bag", j.hasNext());
-
- file.delete();
-}
-
-public void testWriteReadFloat() throws Exception
-{
- DataBag before = new DataBag(Datum.DataType.FLOAT);
-
- before.add(new DataFloat(3.2e32f));
- before.add(new DataFloat(-9.929292e-29f));
- before.add(new DataFloat(97.0f));
-
- File file = null;
- file = File.createTempFile("DataBagFloat", "put");
- FileOutputStream fos = new FileOutputStream(file);
- DataOutput out = new DataOutputStream(fos);
- before.write(out);
- fos.close();
-
- FileInputStream fis = new FileInputStream(file);
- DataInput in = new DataInputStream(fis);
- Datum a = DatumImpl.readDatum(in);
-
- assertTrue("isa DataBag", a instanceof DataBag);
-
- DataBag after = (DataBag)a;
-
- assertTrue("bag of floats", after.bagOf() == Datum.DataType.FLOAT);
- assertEquals("after read, size", 3, after.size());
-
- Iterator<Datum> j = after.content();
-
- Datum val = j.next();
- assertTrue("should be a float", val.getType() == Datum.DataType.FLOAT);
- assertEquals("value of valAfter", 3.2e32f, ((DataFloat)val).get());
+public class TestDataBag extends junit.framework.TestCase {
- val = j.next();
- assertTrue("should be a float", val.getType() == Datum.DataType.FLOAT);
- assertEquals("value of valAfter2", -9.929292e-29f, ((DataFloat)val).get());
+ private Random rand = new Random();
- val = j.next();
- assertTrue("should be a float", val.getType() == Datum.DataType.FLOAT);
- assertEquals("value of valAfter", 97.0f, ((DataFloat)val).get());
+ private class TestMemoryManager {
+ ArrayList<Spillable> mManagedObjects = new ArrayList<Spillable>();
- assertFalse("should have read all values in bag", j.hasNext());
-
- file.delete();
-}
-
-public void testWriteReadDouble() throws Exception
-{
- DataBag before = new DataBag(Datum.DataType.DOUBLE);
-
- before.add(new DataDouble(3.2e132));
- before.add(new DataDouble(-9.929292e-129));
- before.add(new DataDouble(97.0));
-
- File file = null;
- file = File.createTempFile("DataBagDouble", "put");
- FileOutputStream fos = new FileOutputStream(file);
- DataOutput out = new DataOutputStream(fos);
- before.write(out);
- fos.close();
-
- FileInputStream fis = new FileInputStream(file);
- DataInput in = new DataInputStream(fis);
- Datum a = DatumImpl.readDatum(in);
-
- assertTrue("isa DataBag", a instanceof DataBag);
-
- DataBag after = (DataBag)a;
-
- assertTrue("bag of double", after.bagOf() == Datum.DataType.DOUBLE);
- assertEquals("after read, size", 3, after.size());
-
- Iterator<Datum> j = after.content();
-
- Datum val = j.next();
- assertTrue("should be a double", val.getType() == Datum.DataType.DOUBLE);
- assertEquals("value of valAfter", 3.2e132, ((DataDouble)val).get());
-
- val = j.next();
- assertTrue("should be a double", val.getType() == Datum.DataType.DOUBLE);
- assertEquals("value of valAfter2", -9.929292e-129, ((DataDouble)val).get());
-
- val = j.next();
- assertTrue("should be a double", val.getType() == Datum.DataType.DOUBLE);
- assertEquals("value of valAfter", 97.0, ((DataDouble)val).get());
-
- assertFalse("should have read all values in bag", j.hasNext());
-
- file.delete();
-}
-
-public void testWriteReadUtf16() throws Exception
-{
- DataBag before = new DataBag(Datum.DataType.CHARARRAY);
-
- before.add(new DataCharArrayUtf16("zzz"));
- before.add(new DataCharArrayUtf16("yyy"));
- before.add(new DataCharArrayUtf16("xxx"));
-
- File file = null;
- file = File.createTempFile("DataBagUtf16", "put");
- FileOutputStream fos = new FileOutputStream(file);
- DataOutput out = new DataOutputStream(fos);
- before.write(out);
- fos.close();
-
- FileInputStream fis = new FileInputStream(file);
- DataInput in = new DataInputStream(fis);
- Datum a = DatumImpl.readDatum(in);
-
- assertTrue("isa DataBag", a instanceof DataBag);
-
- DataBag after = (DataBag)a;
-
- assertTrue("bag of chararray", after.bagOf() == Datum.DataType.CHARARRAY);
- assertEquals("after read, size", 3, after.size());
-
- Iterator<Datum> j = after.content();
-
- Datum val = j.next();
- assertTrue("should be a chararray", val.getType() == Datum.DataType.CHARARRAY);
- assertTrue("encoding should be utf16",
- ((DataCharArray)val).getEncoding() == DataCharArray.Encoding.UTF16);
- assertEquals("value of valAfter", "zzz", ((DataCharArrayUtf16)val).get());
-
- val = j.next();
- assertTrue("should be a chararray", val.getType() == Datum.DataType.CHARARRAY);
- assertTrue("encoding should be utf16",
- ((DataCharArray)val).getEncoding() == DataCharArray.Encoding.UTF16);
- assertEquals("value of valAfter2", "yyy", ((DataCharArrayUtf16)val).get());
-
- val = j.next();
- assertTrue("should be a chararray", val.getType() == Datum.DataType.CHARARRAY);
- assertTrue("encoding should be utf16",
- ((DataCharArray)val).getEncoding() == DataCharArray.Encoding.UTF16);
- assertEquals("value of valAfter", "xxx", ((DataCharArrayUtf16)val).get());
-
- assertFalse("should have read all values in bag", j.hasNext());
-
- file.delete();
-}
+ public void register(Spillable s) {
+ mManagedObjects.add(s);
+ }
+
+ public void forceSpill() throws IOException {
+ Iterator<Spillable> i = mManagedObjects.iterator();
+ while (i.hasNext()) i.next().spill();
+ }
+ }
+
+ // Need to override the regular bag factory so I can register with my local
+ // memory manager.
+ private class LocalBagFactory {
+ TestMemoryManager mMemMgr;
+
+ public LocalBagFactory(TestMemoryManager mgr) {
+ mMemMgr = mgr;
+ }
+
+ public DataBag newDefaultBag() {
+ DataBag bag = new DefaultDataBag();
+ mMemMgr.register(bag);
+ return bag;
+ }
+
+ public DataBag newSortedBag(EvalSpec sortSpec) {
+ DataBag bag = new SortedDataBag(sortSpec);
+ mMemMgr.register(bag);
+ return bag;
+ }
+
+ public DataBag newDistinctBag() {
+ DataBag bag = new DistinctDataBag();
+ mMemMgr.register(bag);
+ return bag;
+ }
+ }
+
+ // Test reading and writing default from memory, no spills.
+ @Test
+ public void testDefaultInMemory() throws Exception {
+ TestMemoryManager mgr = new TestMemoryManager();
+ LocalBagFactory factory = new LocalBagFactory(mgr);
+ DataBag b = factory.newDefaultBag();
+ ArrayList<Tuple> rightAnswer = new ArrayList<Tuple>(10);
+
+ // Write tuples into both
+ for (int i = 0; i < 10; i++) {
+ Tuple t = new Tuple(new DataAtom(i));
+ b.add(t);
+ rightAnswer.add(t);
+ }
+
+ // Read tuples back, hopefully they come out in the same order.
+ Iterator<Tuple> bIter = b.iterator();
+ Iterator<Tuple> rIter = rightAnswer.iterator();
+
+ while (rIter.hasNext()) {
+ assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+ assertEquals("tuples should be the same", bIter.next(), rIter.next());
+ }
+
+ assertFalse("right answer ran out of tuples before the bag",
+ bIter.hasNext());
+ }
+
+ // Test reading and writing default from file with one spill
+ @Test
+ public void testDefaultSingleSpill() throws Exception {
+ TestMemoryManager mgr = new TestMemoryManager();
+ LocalBagFactory factory = new LocalBagFactory(mgr);
+ DataBag b = factory.newDefaultBag();
+ ArrayList<Tuple> rightAnswer = new ArrayList<Tuple>(10);
+
+ // Write tuples into both
+ for (int i = 0; i < 10; i++) {
+ Tuple t = new Tuple(new DataAtom(i));
+ b.add(t);
+ rightAnswer.add(t);
+ }
+ mgr.forceSpill();
+
+ // Read tuples back, hopefully they come out in the same order.
+ Iterator<Tuple> bIter = b.iterator();
+ Iterator<Tuple> rIter = rightAnswer.iterator();
+
+ while (rIter.hasNext()) {
+ assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+ assertEquals("tuples should be the same", bIter.next(), rIter.next());
+ }
+
+ assertFalse("right answer ran out of tuples before the bag",
+ bIter.hasNext());
+ }
+
+ // Test reading and writing default from file with three spills
+ @Test
+ public void testDefaultTripleSpill() throws Exception {
+ TestMemoryManager mgr = new TestMemoryManager();
+ LocalBagFactory factory = new LocalBagFactory(mgr);
+ DataBag b = factory.newDefaultBag();
+ ArrayList<Tuple> rightAnswer = new ArrayList<Tuple>(30);
+
+ // Write tuples into both
+ for (int j = 0; j < 3; j++) {
+ for (int i = 0; i < 10; i++) {
+ Tuple t = new Tuple(new DataAtom(i));
+ b.add(t);
+ rightAnswer.add(t);
+ }
+ mgr.forceSpill();
+ }
+
+ // Read tuples back, hopefully they come out in the same order.
+ Iterator<Tuple> bIter = b.iterator();
+ Iterator<Tuple> rIter = rightAnswer.iterator();
+
+ while (rIter.hasNext()) {
+ assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+ assertEquals("tuples should be the same", bIter.next(), rIter.next());
+ }
+
+ assertFalse("right answer ran out of tuples before the bag",
+ bIter.hasNext());
+ }
+
+ // Test reading with some in file, some in memory.
+ @Test
+ public void testDefaultInMemInFile() throws Exception {
+ TestMemoryManager mgr = new TestMemoryManager();
+ LocalBagFactory factory = new LocalBagFactory(mgr);
+ DataBag b = factory.newDefaultBag();
+ ArrayList<Tuple> rightAnswer = new ArrayList<Tuple>(20);
+
+ // Write tuples into both
+ for (int i = 0; i < 10; i++) {
+ Tuple t = new Tuple(new DataAtom(i));
+ b.add(t);
+ rightAnswer.add(t);
+ }
+ mgr.forceSpill();
+
+ for (int i = 0; i < 10; i++) {
+ Tuple t = new Tuple(new DataAtom(i));
+ b.add(t);
+ rightAnswer.add(t);
+ }
+
+ // Read tuples back, hopefully they come out in the same order.
+ Iterator<Tuple> bIter = b.iterator();
+ Iterator<Tuple> rIter = rightAnswer.iterator();
+
+ while (rIter.hasNext()) {
+ assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+ assertEquals("tuples should be the same", bIter.next(), rIter.next());
+ }
+
+ assertFalse("right answer ran out of tuples before the bag",
+ bIter.hasNext());
+ }
+
+ // Test reading with a spill happening in the middle of the read.
+ @Test
+ public void testDefaultSpillDuringRead() throws Exception {
+ TestMemoryManager mgr = new TestMemoryManager();
+ LocalBagFactory factory = new LocalBagFactory(mgr);
+ DataBag b = factory.newDefaultBag();
+ ArrayList<Tuple> rightAnswer = new ArrayList<Tuple>(20);
+
+ // Write tuples into both
+ for (int i = 0; i < 10; i++) {
+ Tuple t = new Tuple(new DataAtom(i));
+ b.add(t);
+ rightAnswer.add(t);
+ }
+ mgr.forceSpill();
+
+ for (int i = 0; i < 10; i++) {
+ Tuple t = new Tuple(new DataAtom(i));
+ b.add(t);
+ rightAnswer.add(t);
+ }
+
+ // Read tuples back, hopefully they come out in the same order.
+ Iterator<Tuple> bIter = b.iterator();
+ Iterator<Tuple> rIter = rightAnswer.iterator();
+
+ for (int i = 0; i < 15; i++) {
+ assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+ assertEquals("tuples should be the same", bIter.next(), rIter.next());
+ }
+
+ mgr.forceSpill();
+
+ while (rIter.hasNext()) {
+ assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+ assertEquals("tuples should be the same", bIter.next(), rIter.next());
+ }
+
+ assertFalse("right answer ran out of tuples before the bag",
+ bIter.hasNext());
+ }
+
+ // Test reading and writing sorted from memory, no spills.
+ @Test
+ public void testSortedInMemory() throws Exception {
+ TestMemoryManager mgr = new TestMemoryManager();
+ LocalBagFactory factory = new LocalBagFactory(mgr);
+ DataBag b = factory.newSortedBag(null);
+ PriorityQueue<Tuple> rightAnswer = new PriorityQueue<Tuple>(10);
+
+ // Write tuples into both
+ for (int i = 0; i < 10; i++) {
+ Tuple t = new Tuple(new DataAtom(rand.nextInt()));
+ b.add(t);
+ rightAnswer.add(t);
+ }
+
+ // Read tuples back, hopefully they come out in the same order.
+ Iterator<Tuple> bIter = b.iterator();
+
+ Tuple t;
+ while ((t = rightAnswer.poll()) != null) {
+ assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+ assertEquals("tuples should be the same", bIter.next(), t);
+ }
+
+ assertFalse("right answer ran out of tuples before the bag",
+ bIter.hasNext());
+ }
+
+ // Test reading and writing default from file with one spill
+ @Test
+ public void testSortedSingleSpill() throws Exception {
+ TestMemoryManager mgr = new TestMemoryManager();
+ LocalBagFactory factory = new LocalBagFactory(mgr);
+ DataBag b = factory.newSortedBag(null);
+ PriorityQueue<Tuple> rightAnswer = new PriorityQueue<Tuple>(10);
+
+ // Write tuples into both
+ for (int i = 0; i < 10; i++) {
+ Tuple t = new Tuple(new DataAtom(rand.nextInt()));
+ b.add(t);
+ rightAnswer.add(t);
+ }
+ mgr.forceSpill();
+
+ // Read tuples back, hopefully they come out in the same order.
+ Iterator<Tuple> bIter = b.iterator();
+ Tuple t;
+ while ((t = rightAnswer.poll()) != null) {
+ assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+ assertEquals("tuples should be the same", bIter.next(), t);
+ }
+
+ assertFalse("right answer ran out of tuples before the bag",
+ bIter.hasNext());
+ }
+
+ // Test reading and writing default from file with three spills
+ @Test
+ public void testSortedTripleSpill() throws Exception {
+ TestMemoryManager mgr = new TestMemoryManager();
+ LocalBagFactory factory = new LocalBagFactory(mgr);
+ DataBag b = factory.newSortedBag(null);
+ PriorityQueue<Tuple> rightAnswer = new PriorityQueue<Tuple>(30);
+
+ // Write tuples into both
+ for (int j = 0; j < 3; j++) {
+ for (int i = 0; i < 10; i++) {
+ Tuple t = new Tuple(new DataAtom(rand.nextInt()));
+ b.add(t);
+ rightAnswer.add(t);
+ }
+ mgr.forceSpill();
+ }
+
+ // Read tuples back, hopefully they come out in the same order.
+ Iterator<Tuple> bIter = b.iterator();
+
+ Tuple t;
+ while ((t = rightAnswer.poll()) != null) {
+ assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+ assertEquals("tuples should be the same", bIter.next(), t);
+ }
+
+ assertFalse("right answer ran out of tuples before the bag",
+ bIter.hasNext());
+ }
+
+ // Test reading with some in file, some in memory.
+ @Test
+ public void testSortedInMemInFile() throws Exception {
+ TestMemoryManager mgr = new TestMemoryManager();
+ LocalBagFactory factory = new LocalBagFactory(mgr);
+ DataBag b = factory.newSortedBag(null);
+ PriorityQueue<Tuple> rightAnswer = new PriorityQueue<Tuple>(20);
+
+ // Write tuples into both
+ for (int i = 0; i < 10; i++) {
+ Tuple t = new Tuple(new DataAtom(rand.nextInt()));
+ b.add(t);
+ rightAnswer.add(t);
+ }
+ mgr.forceSpill();
+
+ for (int i = 0; i < 10; i++) {
+ Tuple t = new Tuple(new DataAtom(rand.nextInt()));
+ b.add(t);
+ rightAnswer.add(t);
+ }
+
+ // Read tuples back, hopefully they come out in the same order.
+ Iterator<Tuple> bIter = b.iterator();
+ Tuple t;
+ while ((t = rightAnswer.poll()) != null) {
+ assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+ assertEquals("tuples should be the same", bIter.next(), t);
+ }
+
+ assertFalse("right answer ran out of tuples before the bag",
+ bIter.hasNext());
+ }
+
+ // Test reading with a spill happening in the middle of the read.
+ @Test
+ public void testSortedSpillDuringRead() throws Exception {
+ TestMemoryManager mgr = new TestMemoryManager();
+ LocalBagFactory factory = new LocalBagFactory(mgr);
+ DataBag b = factory.newSortedBag(null);
+ PriorityQueue<Tuple> rightAnswer = new PriorityQueue<Tuple>(20);
+
+ // Write tuples into both
+ for (int i = 0; i < 10; i++) {
+ Tuple t = new Tuple(new DataAtom(rand.nextInt()));
+ b.add(t);
+ rightAnswer.add(t);
+ }
+ mgr.forceSpill();
+
+ for (int i = 0; i < 10; i++) {
+ Tuple t = new Tuple(new DataAtom(rand.nextInt()));
+ b.add(t);
+ rightAnswer.add(t);
+ }
+
+ // Read tuples back, hopefully they come out in the same order.
+ Iterator<Tuple> bIter = b.iterator();
+
+ for (int i = 0; i < 15; i++) {
+ assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+ assertEquals("tuples should be the same", bIter.next(), rightAnswer.poll());
+ }
+
+ mgr.forceSpill();
+
+ Tuple t;
+ while ((t = rightAnswer.poll()) != null) {
+ assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+ assertEquals("tuples should be the same", bIter.next(), t);
+ }
+
+ assertFalse("right answer ran out of tuples before the bag",
+ bIter.hasNext());
+ }
+
+ // Test reading with first spill happening in the middle of the read.
+ @Test
+ public void testSortedFirstSpillDuringRead() throws Exception {
+ TestMemoryManager mgr = new TestMemoryManager();
+ LocalBagFactory factory = new LocalBagFactory(mgr);
+ DataBag b = factory.newSortedBag(null);
+ PriorityQueue<Tuple> rightAnswer = new PriorityQueue<Tuple>(20);
+
+ for (int i = 0; i < 10; i++) {
+ Tuple t = new Tuple(new DataAtom(rand.nextInt()));
+ b.add(t);
+ rightAnswer.add(t);
+ }
+
+ // Read tuples back, hopefully they come out in the same order.
+ Iterator<Tuple> bIter = b.iterator();
+
+ for (int i = 0; i < 5; i++) {
+ assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+ assertEquals("tuples should be the same", bIter.next(), rightAnswer.poll());
+ }
+
+ mgr.forceSpill();
+
+ Tuple t;
+ while ((t = rightAnswer.poll()) != null) {
+ assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+ assertEquals("tuples should be the same", bIter.next(), t);
+ }
+
+ assertFalse("right answer ran out of tuples before the bag",
+ bIter.hasNext());
+ }
+
+ // Test reading and writing sorted file with so many spills it requires
+ // premerge.
+ @Test
+ public void testSortedPreMerge() throws Exception {
+ TestMemoryManager mgr = new TestMemoryManager();
+ LocalBagFactory factory = new LocalBagFactory(mgr);
+ DataBag b = factory.newSortedBag(null);
+ PriorityQueue<Tuple> rightAnswer = new PriorityQueue<Tuple>(30);
+
+ // Write tuples into both
+ for (int j = 0; j < 373; j++) {
+ for (int i = 0; i < 10; i++) {
+ Tuple t = new Tuple(new DataAtom(rand.nextInt()));
+ b.add(t);
+ rightAnswer.add(t);
+ }
+ mgr.forceSpill();
+ }
+
+ // Read tuples back, hopefully they come out in the same order.
+ Iterator<Tuple> bIter = b.iterator();
+
+ Tuple t;
+ while ((t = rightAnswer.poll()) != null) {
+ assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+ assertEquals("tuples should be the same", bIter.next(), t);
+ }
+
+ assertFalse("right answer ran out of tuples before the bag",
+ bIter.hasNext());
+ }
+
+ // Test reading and writing distinct from memory, no spills.
+ @Test
+ public void testDistinctInMemory() throws Exception {
+ TestMemoryManager mgr = new TestMemoryManager();
+ LocalBagFactory factory = new LocalBagFactory(mgr);
+ DataBag b = factory.newDistinctBag();
+ TreeSet<Tuple> rightAnswer = new TreeSet<Tuple>();
+
+ // Write tuples into both
+ for (int i = 0; i < 50; i++) {
+ Tuple t = new Tuple(new DataAtom(rand.nextInt() % 5));
+ b.add(t);
+ rightAnswer.add(t);
+ }
+
+ // Read tuples back, hopefully they come out in the same order.
+ Iterator<Tuple> bIter = b.iterator();
+ Iterator<Tuple> rIter = rightAnswer.iterator();
+
+ while (rIter.hasNext()) {
+ assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+ assertEquals("tuples should be the same", bIter.next(), rIter.next());
+ }
+
+ assertFalse("right answer ran out of tuples before the bag",
+ bIter.hasNext());
+ }
+
+ // Test reading and writing distinct from file with one spill
+ @Test
+ public void testDistinctSingleSpill() throws Exception {
+ TestMemoryManager mgr = new TestMemoryManager();
+ LocalBagFactory factory = new LocalBagFactory(mgr);
+ DataBag b = factory.newDistinctBag();
+ TreeSet<Tuple> rightAnswer = new TreeSet<Tuple>();
+
+ // Write tuples into both
+ for (int i = 0; i < 50; i++) {
+ Tuple t = new Tuple(new DataAtom(rand.nextInt() % 5));
+ b.add(t);
+ rightAnswer.add(t);
+ }
+ mgr.forceSpill();
+
+ // Read tuples back, hopefully they come out in the same order.
+ Iterator<Tuple> bIter = b.iterator();
+ Iterator<Tuple> rIter = rightAnswer.iterator();
+
+ while (rIter.hasNext()) {
+ assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+ assertEquals("tuples should be the same", bIter.next(), rIter.next());
+ }
+
+ assertFalse("right answer ran out of tuples before the bag",
+ bIter.hasNext());
+ }
+
+ // Test reading and writing distinct from file with three spills
+ @Test
+ public void testDistinctTripleSpill() throws Exception {
+ TestMemoryManager mgr = new TestMemoryManager();
+ LocalBagFactory factory = new LocalBagFactory(mgr);
+ DataBag b = factory.newDistinctBag();
+ TreeSet<Tuple> rightAnswer = new TreeSet<Tuple>();
+
+ // Write tuples into both
+ for (int j = 0; j < 3; j++) {
+ for (int i = 0; i < 50; i++) {
+ Tuple t = new Tuple(new DataAtom(rand.nextInt() % 5));
+ b.add(t);
+ rightAnswer.add(t);
+ }
+ mgr.forceSpill();
+ }
+
+ // Read tuples back, hopefully they come out in the same order.
+ Iterator<Tuple> bIter = b.iterator();
+ Iterator<Tuple> rIter = rightAnswer.iterator();
+
+ while (rIter.hasNext()) {
+ assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+ assertEquals("tuples should be the same", bIter.next(), rIter.next());
+ }
+
+ assertFalse("right answer ran out of tuples before the bag",
+ bIter.hasNext());
+ }
+
+ // Test reading with some in file, some in memory.
+ @Test
+ public void testDistinctInMemInFile() throws Exception {
+ TestMemoryManager mgr = new TestMemoryManager();
+ LocalBagFactory factory = new LocalBagFactory(mgr);
+ DataBag b = factory.newDistinctBag();
+ TreeSet<Tuple> rightAnswer = new TreeSet<Tuple>();
+
+ // Write tuples into both
+ for (int i = 0; i < 50; i++) {
+ Tuple t = new Tuple(new DataAtom(rand.nextInt() % 5));
+ b.add(t);
+ rightAnswer.add(t);
+ }
+ mgr.forceSpill();
+
+ for (int i = 0; i < 50; i++) {
+ Tuple t = new Tuple(new DataAtom(i));
+ b.add(t);
+ rightAnswer.add(t);
+ }
+
+ // Read tuples back, hopefully they come out in the same order.
+ Iterator<Tuple> bIter = b.iterator();
+ Iterator<Tuple> rIter = rightAnswer.iterator();
+
+ while (rIter.hasNext()) {
+ assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+ assertEquals("tuples should be the same", bIter.next(), rIter.next());
+ }
+
+ assertFalse("right answer ran out of tuples before the bag",
+ bIter.hasNext());
+ }
+
+ // Test reading with a spill happening in the middle of the read.
+ @Test
+ public void testDistinctSpillDuringRead() throws Exception {
+ TestMemoryManager mgr = new TestMemoryManager();
+ LocalBagFactory factory = new LocalBagFactory(mgr);
+ DataBag b = factory.newDistinctBag();
+ TreeSet<Tuple> rightAnswer = new TreeSet<Tuple>();
+
+ // Write tuples into both
+ for (int i = 0; i < 50; i++) {
+ Tuple t = new Tuple(new DataAtom(rand.nextInt() % 5));
+ b.add(t);
+ rightAnswer.add(t);
+ }
+ mgr.forceSpill();
+
+ for (int i = 0; i < 50; i++) {
+ Tuple t = new Tuple(new DataAtom(i));
+ b.add(t);
+ rightAnswer.add(t);
+ }
+
+ // Read tuples back, hopefully they come out in the same order.
+ Iterator<Tuple> bIter = b.iterator();
+ Iterator<Tuple> rIter = rightAnswer.iterator();
+
+ for (int i = 0; i < 5; i++) {
+ assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+ assertEquals("tuples should be the same", bIter.next(), rIter.next());
+ }
+
+ mgr.forceSpill();
+
+ while (rIter.hasNext()) {
+ assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+ assertEquals("tuples should be the same", bIter.next(), rIter.next());
+ }
+
+ assertFalse("right answer ran out of tuples before the bag",
+ bIter.hasNext());
+ }
+
+ // Test reading and writing distinct from file with enough spills to
+ // force a pre-merge
+ @Test
+ public void testDistinctPreMerge() throws Exception {
+ TestMemoryManager mgr = new TestMemoryManager();
+ LocalBagFactory factory = new LocalBagFactory(mgr);
+ DataBag b = factory.newDistinctBag();
+ TreeSet<Tuple> rightAnswer = new TreeSet<Tuple>();
+
+ // Write tuples into both
+ for (int j = 0; j < 321; j++) {
+ for (int i = 0; i < 50; i++) {
+ Tuple t = new Tuple(new DataAtom(rand.nextInt() % 5));
+ b.add(t);
+ rightAnswer.add(t);
+ }
+ mgr.forceSpill();
+ }
+
+ // Read tuples back, hopefully they come out in the same order.
+ Iterator<Tuple> bIter = b.iterator();
+ Iterator<Tuple> rIter = rightAnswer.iterator();
+
+ while (rIter.hasNext()) {
+ assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+ assertEquals("tuples should be the same", bIter.next(), rIter.next());
+ }
+
+ assertFalse("right answer ran out of tuples before the bag",
+ bIter.hasNext());
+ }
+
+ // Test the default bag factory.
+ @Test
+ public void testDefaultBagFactory() throws Exception {
+ BagFactory f = BagFactory.getInstance();
+
+ DataBag bag = f.newDefaultBag();
+ DataBag sorted = f.newSortedBag(null);
+ DataBag distinct = f.newDistinctBag();
+
+ assertTrue("Expected a default bag", (bag instanceof DefaultDataBag));
+ assertTrue("Expected a sorted bag", (sorted instanceof SortedDataBag));
+ assertTrue("Expected a distinct bag", (distinct instanceof DistinctDataBag));
+ }
+
+ @Test
+ public void testProvidedBagFactory() throws Exception {
+ // Test bogus factory name.
+ BagFactory.resetSelf();
+ System.setProperty("pig.data.bag.factory.name", "no such class");
+ System.setProperty("pig.data.bag.factory.jar", "file:./pig.jar");
+ boolean caughtIt = false;
+ try {
+ BagFactory f = BagFactory.getInstance();
+ } catch (RuntimeException re) {
+ assertEquals("Expected Unable to instantiate message",
+ "Unable to instantiate bag factory no such class",
+ re.getMessage());
+ caughtIt = true;
+ }
+ assertTrue("Expected to catch exception", caughtIt);
+
+ // Test factory that isn't a BagFactory
+ BagFactory.resetSelf();
+ System.setProperty("pig.data.bag.factory.name",
+ "org.apache.pig.test.TestDataBag");
+ System.setProperty("pig.data.bag.factory.jar",
+ "file:./pig.jar");
+ caughtIt = false;
+ try {
+ BagFactory f = BagFactory.getInstance();
+ } catch (RuntimeException re) {
+ assertEquals("Expected does not extend BagFactory message",
+ "Provided factory org.apache.pig.test.TestDataBag does not extend BagFactory!",
+ re.getMessage());
+ caughtIt = true;
+ }
+ assertTrue("Expected to catch exception", caughtIt);
+
+ // Test that we can instantiate our test factory.
+ BagFactory.resetSelf();
+ System.setProperty("pig.data.bag.factory.name",
+ "org.apache.pig.test.NonDefaultBagFactory");
+ System.setProperty("pig.data.bag.factory.jar", "file:./pig.jar");
+ BagFactory f = BagFactory.getInstance();
+ DataBag b = f.newDefaultBag();
+ b = f.newSortedBag(null);
+ b = f.newDistinctBag();
-public void testWriteReadNone() throws Exception
-{
- DataBag before = new DataBag(Datum.DataType.CHARARRAY);
-
- String s = new String("zzz");
- before.add(new DataCharArrayNone(s.getBytes()));
- s = new String("yyy");
- before.add(new DataCharArrayNone(s.getBytes()));
- s = new String("xxx");
- before.add(new DataCharArrayNone(s.getBytes()));
-
- File file = null;
- file = File.createTempFile("DataBagCharArrayNone", "put");
- FileOutputStream fos = new FileOutputStream(file);
- DataOutput out = new DataOutputStream(fos);
- before.write(out);
- fos.close();
-
- FileInputStream fis = new FileInputStream(file);
- DataInput in = new DataInputStream(fis);
- Datum a = DatumImpl.readDatum(in);
-
- assertTrue("isa DataBag", a instanceof DataBag);
-
- DataBag after = (DataBag)a;
-
- assertTrue("bag of chararray", after.bagOf() == Datum.DataType.CHARARRAY);
- assertEquals("after read, size", 3, after.size());
-
- Iterator<Datum> j = after.content();
-
- Datum valAfter = j.next();
- assertTrue("should be a chararray", valAfter.getType() == Datum.DataType.CHARARRAY);
- assertTrue("encoding should be none",
- ((DataCharArray)valAfter).getEncoding() == DataCharArray.Encoding.NONE);
- for (int i = 0; i < 3; i++) {
- assertEquals("value of valAfter", (byte)0x7a,
- ((DataCharArrayNone)valAfter).get()[i]);
- }
-
- valAfter = j.next();
- assertTrue("should be a chararray", valAfter.getType() == Datum.DataType.CHARARRAY);
- assertTrue("encoding should be none",
- ((DataCharArray)valAfter).getEncoding() == DataCharArray.Encoding.NONE);
- for (int i = 0; i < 3; i++) {
- assertEquals("value of valAfter", (byte)0x79,
- ((DataCharArrayNone)valAfter).get()[i]);
- }
-
- valAfter = j.next();
- assertTrue("should be a chararray", valAfter.getType() == Datum.DataType.CHARARRAY);
- assertTrue("encoding should be none",
- ((DataCharArray)valAfter).getEncoding() == DataCharArray.Encoding.NONE);
- for (int i = 0; i < 3; i++) {
- assertEquals("value of valAfter", (byte)0x78,
- ((DataCharArrayNone)valAfter).get()[i]);
- }
-
- assertFalse("should have read all values in bag", j.hasNext());
-
- file.delete();
+ BagFactory.resetSelf();
+ }
}
-public void testWriteReadMap() throws Exception
-{
- DataBag before = new DataBag(Datum.DataType.MAP);
-
- DataMap map = new DataMap();
-
- DataInteger key = new DataInteger(1);
- Datum val = new DataInteger(99);
- map.put(key, val);
-
- before.add(map);
-
- File file = null;
- file = File.createTempFile("DataBagCharArrayNone", "put");
- FileOutputStream fos = new FileOutputStream(file);
- DataOutput out = new DataOutputStream(fos);
- before.write(out);
- fos.close();
-
- FileInputStream fis = new FileInputStream(file);
- DataInput in = new DataInputStream(fis);
- Datum a = DatumImpl.readDatum(in);
-
- assertTrue("isa DataBag", a instanceof DataBag);
-
- DataBag after = (DataBag)a;
-
- assertTrue("bag of maps", after.bagOf() == Datum.DataType.MAP);
- assertEquals("after read, size", 1, after.size());
-
- Iterator<Datum> j = after.content();
-
- Datum v = j.next();
- assertTrue("valAfter should be a map", v.getType() == Datum.DataType.MAP);
- DataMap valAfter = (DataMap)v;
-
- assertEquals("valAfter size", 1L, valAfter.size());
-
- DataInteger nosuch = new DataInteger(-1);
- Datum d = valAfter.get(nosuch);
- assertTrue("after read, no such key", d.isNull());
-
- Datum mapValAfter = valAfter.get(key);
- assertTrue("mapValAfter isa integer", mapValAfter instanceof DataInteger);
- assertEquals("value of valAfter", 99, ((DataInteger)mapValAfter).get());
-
- assertFalse("should have read all values in bag", j.hasNext());
-
- file.delete();
-}
-
-public void testWriteReadTuple() throws Exception
-{
- DataBag before = new DataBag(Datum.DataType.TUPLE);
-
- Tuple t = new Tuple(1);
- t.setField(0, new DataInteger(1));
- before.add(t);
-
- File file = null;
- file = File.createTempFile("DataBagCharArrayNone", "put");
- FileOutputStream fos = new FileOutputStream(file);
- DataOutput out = new DataOutputStream(fos);
- before.write(out);
- fos.close();
-
- FileInputStream fis = new FileInputStream(file);
- DataInput in = new DataInputStream(fis);
- Datum a = DatumImpl.readDatum(in);
-
- assertTrue("isa DataBag", a instanceof DataBag);
-
- DataBag after = (DataBag)a;
-
- assertTrue("bag of tuples", after.bagOf() == Datum.DataType.TUPLE);
- assertEquals("after read, size", 1, after.size());
-
- Iterator<Datum> j = after.content();
-
- Datum v = j.next();
- assertTrue("valAfter should be a tuple",
- v.getType() == Datum.DataType.TUPLE);
-
- Tuple valAfter = (Tuple)v;
-
- assertEquals("valAfter size", 1L, valAfter.size());
-
- Datum tupleValAfter = valAfter.getField(0);
- assertTrue("tupleValAfter isa integer", tupleValAfter instanceof DataInteger);
- assertEquals("value of valAfter", 1, ((DataInteger)tupleValAfter).get());
-
- assertFalse("should have read all values in bag", j.hasNext());
-
- file.delete();
-}
-
-public void testWriteReadBag() throws Exception
-{
- DataBag before = new DataBag(Datum.DataType.BAG);
-
- DataBag b = new DataBag(Datum.DataType.INT);
- b.add(new DataInteger(2));
- before.add(b);
-
- File file = null;
- file = File.createTempFile("DataBagCharArrayNone", "put");
- FileOutputStream fos = new FileOutputStream(file);
- DataOutput out = new DataOutputStream(fos);
- before.write(out);
- fos.close();
-
- FileInputStream fis = new FileInputStream(file);
- DataInput in = new DataInputStream(fis);
- Datum a = DatumImpl.readDatum(in);
-
- assertTrue("isa DataBag", a instanceof DataBag);
-
- DataBag after = (DataBag)a;
-
- assertTrue("bag of bags", after.bagOf() == Datum.DataType.BAG);
- assertEquals("after read, size", 1, after.size());
-
- Iterator<Datum> j = after.content();
-
- Datum v = j.next();
- assertTrue("valAfter should be a bag", v.getType() == Datum.DataType.BAG);
- DataBag valAfter = (DataBag)v;
-
- assertEquals("valAfter size", 1L, valAfter.size());
-
- Iterator<Datum> k = valAfter.content();
- Datum w = k.next();
- assertTrue("bagValAfter should be an integer",
- w.getType() == Datum.DataType.INT);
- DataInteger bagValAfter = (DataInteger)w;
-
- assertEquals("value of valAfter", 2, bagValAfter.get());
-
- assertFalse("should have read all values in inner bag", k.hasNext());
- assertFalse("should have read all values in bag", j.hasNext());
-
- file.delete();
-}
-
-}
-