You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/01/04 23:58:23 UTC
svn commit: r609048 [2/2] - in /incubator/pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/builtin/ src/org/apache/pig/data/
src/org/apache/pig/impl/ src/org/apache/pig/impl/builtin/
src/org/apache/pig/impl/eval/ src/org/apache/pig/impl/eval/collec...
Modified: incubator/pig/trunk/src/org/apache/pig/impl/PigContext.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/PigContext.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/PigContext.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/PigContext.java Fri Jan 4 14:58:20 2008
@@ -56,6 +56,7 @@
import org.apache.pig.impl.mapreduceExec.MapReduceLauncher;
import org.apache.pig.impl.mapreduceExec.PigMapReduce;
import org.apache.pig.impl.util.JarManager;
+import org.apache.pig.impl.util.PigLogger;
import org.apache.pig.shock.SSHSocketImplFactory;
@@ -90,7 +91,6 @@
// connection to hadoop jobtracker (stays as null if doing local execution)
transient private JobSubmissionProtocol jobTracker;
transient private JobClient jobClient;
- transient private Logger mLogger;
private String jobName = JOB_NAME_PREFIX; // can be overwritten by users
@@ -110,8 +110,6 @@
public PigContext(ExecType execType){
this.execType = execType;
- mLogger = Logger.getLogger("org.apache.pig");
-
initProperties();
String pigJar = JarManager.findContainingJar(Main.class);
@@ -131,6 +129,8 @@
}
private void initProperties() {
+ Logger log = PigLogger.getLogger();
+
Properties fileProperties = new Properties();
try{
@@ -155,15 +155,16 @@
//Now set these as system properties only if they are not already defined.
for (Object o: fileProperties.keySet()){
String propertyName = (String)o;
- mLogger.debug("Found system property " + propertyName + " in .pigrc");
+ log.debug("Found system property " + propertyName + " in .pigrc");
if (System.getProperty(propertyName) == null){
System.setProperty(propertyName, fileProperties.getProperty(propertyName));
- mLogger.debug("Setting system property " + propertyName);
+ log.debug("Setting system property " + propertyName);
}
}
}
public void connect(){
+ Logger log = PigLogger.getLogger();
try{
if (execType != ExecType.LOCAL){
//First set the ssh socket factory
@@ -199,10 +200,10 @@
lfs = FileSystem.getNamed("local", conf);
- mLogger.info("Connecting to hadoop file system at: " + conf.get("fs.default.name"));
+ log.info("Connecting to hadoop file system at: " + conf.get("fs.default.name"));
dfs = FileSystem.get(conf);
- mLogger.info("Connecting to map-reduce job tracker at: " + conf.get("mapred.job.tracker"));
+ log.info("Connecting to map-reduce job tracker at: " + conf.get("mapred.job.tracker"));
jobTracker = (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
JobSubmissionProtocol.versionID, JobTracker.getAddress(conf), conf);
jobClient = new JobClient(conf);
@@ -233,6 +234,7 @@
};
private String[] doHod(String server) {
+ Logger log = PigLogger.getLogger();
if (hodMapRed != null) {
return new String[] {hodHDFS, hodMapRed};
}
@@ -250,7 +252,10 @@
cmd.append(System.getProperty("hod.command"));
//String cmd = System.getProperty("hod.command", "/home/breed/startHOD.expect");
String cluster = System.getProperty("yinst.cluster");
- if (cluster.length() > 0 && !cluster.startsWith("kryptonite")) {
+ // TODO This is a Yahoo specific holdover, need to remove
+ // this.
+ if (cluster != null && cluster.length() > 0 &&
+ !cluster.startsWith("kryptonite")) {
cmd.append(" --config=");
cmd.append(System.getProperty("hod.config.dir"));
cmd.append('/');
@@ -264,8 +269,8 @@
p = fac.ssh(cmd.toString());
}
InputStream is = p.getInputStream();
- mLogger.info("Connecting to HOD...");
- mLogger.debug("sending HOD command " + cmd.toString());
+ log.info("Connecting to HOD...");
+ log.debug("sending HOD command " + cmd.toString());
StringBuffer sb = new StringBuffer();
int c;
String hdfsUI = null;
@@ -279,23 +284,23 @@
switch(current) {
case HDFSUI:
hdfsUI = sb.toString().trim();
- mLogger.info("HDFS Web UI: " + hdfsUI);
+ log.info("HDFS Web UI: " + hdfsUI);
break;
case HDFS:
hdfs = sb.toString().trim();
- mLogger.info("HDFS: " + hdfs);
+ log.info("HDFS: " + hdfs);
break;
case MAPREDUI:
mapredUI = sb.toString().trim();
- mLogger.info("JobTracker Web UI: " + mapredUI);
+ log.info("JobTracker Web UI: " + mapredUI);
break;
case MAPRED:
mapred = sb.toString().trim();
- mLogger.info("JobTracker: " + mapred);
+ log.info("JobTracker: " + mapred);
break;
case HADOOPCONF:
hadoopConf = sb.toString().trim();
- mLogger.info("HadoopConf: " + hadoopConf);
+ log.info("HadoopConf: " + hadoopConf);
break;
}
current = ParsingState.NOTHING;
@@ -341,7 +346,7 @@
throw new IOException("Missing Hadoop configuration file");
return new String[] {hdfs, mapred};
} catch (Exception e) {
- mLogger.fatal("Could not connect to HOD", e);
+ log.fatal("Could not connect to HOD", e);
System.exit(4);
}
throw new RuntimeException("Could not scrape needed information.");
@@ -415,10 +420,6 @@
public JobConf getConf() {
return conf;
}
-
- public Logger getLogger() {
- return mLogger;
- }
public void setJobtrackerLocation(String newLocation) {
conf.set("mapred.job.tracker", newLocation);
Modified: incubator/pig/trunk/src/org/apache/pig/impl/builtin/FindQuantiles.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/builtin/FindQuantiles.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/builtin/FindQuantiles.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/builtin/FindQuantiles.java Fri Jan 4 14:58:20 2008
@@ -37,12 +37,12 @@
int numQuantiles = input.getAtomField(0).numval().intValue();
DataBag samples = input.getBagField(1);
- int numSamples = samples.cardinality();
+ long numSamples = samples.size();
- int toSkip = numSamples / numQuantiles;
+ long toSkip = numSamples / numQuantiles;
- int i=0, nextQuantile = 0;
- Iterator<Tuple> iter = samples.content();
+ long i=0, nextQuantile = 0;
+ Iterator<Tuple> iter = samples.iterator();
while (iter.hasNext()){
Tuple t = iter.next();
if (i==nextQuantile){
Modified: incubator/pig/trunk/src/org/apache/pig/impl/builtin/ShellBagEvalFunc.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/builtin/ShellBagEvalFunc.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/builtin/ShellBagEvalFunc.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/builtin/ShellBagEvalFunc.java Fri Jan 4 14:58:20 2008
@@ -22,6 +22,7 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.Iterator;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.DataBag;
@@ -47,9 +48,16 @@
this.cmd = cmd;
}
- private class EndOfQueue extends DataBag{
- public void add(Datum d){}
- }
+ private class EndOfQueue extends DataBag {
+ @Override
+ public void add(Tuple t){}
+
+ // To satisfy abstract functions in DataBag.
+ public boolean isSorted() { return false; }
+ public boolean isDistinct() { return false; }
+ public Iterator<Tuple> iterator() { return null; }
+ public long spill() { return 0; }
+ }
private void startProcess() throws IOException {
Process p = Runtime.getRuntime().exec(cmd);
Modified: incubator/pig/trunk/src/org/apache/pig/impl/eval/FuncEvalSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/FuncEvalSpec.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/FuncEvalSpec.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/FuncEvalSpec.java Fri Jan 4 14:58:20 2008
@@ -21,6 +21,7 @@
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
+import java.util.Iterator;
import org.apache.pig.EvalFunc;
import org.apache.pig.Algebraic;
@@ -158,6 +159,13 @@
public FakeDataBag(DataCollector successor){
this.successor = successor;
}
+
+ // To satisfy abstract functions in DataBag.
+ public boolean isSorted() { return false; }
+ public boolean isDistinct() { return false; }
+ public Iterator<Tuple> iterator() { return null; }
+ public long spill() { return 0; }
+
void addStart(){
successor.add(DataBag.startBag);
Modified: incubator/pig/trunk/src/org/apache/pig/impl/eval/GenerateSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/GenerateSpec.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/GenerateSpec.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/GenerateSpec.java Fri Jan 4 14:58:20 2008
@@ -110,11 +110,7 @@
DataBag bag;
public DatumBag(){
super(null);
- try{
- bag = BagFactory.getInstance().getNewBag();
- }catch(IOException e){
- throw new RuntimeException(e);
- }
+ bag = BagFactory.getInstance().newDefaultBag();
}
@Override
@@ -126,7 +122,7 @@
return new Iterator<Datum>(){
Iterator<Tuple> iter;
{
- iter = bag.content();
+ iter = bag.iterator();
}
public boolean hasNext() {
return iter.hasNext();
Modified: incubator/pig/trunk/src/org/apache/pig/impl/eval/SortDistinctSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/SortDistinctSpec.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/SortDistinctSpec.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/SortDistinctSpec.java Fri Jan 4 14:58:20 2008
@@ -74,16 +74,11 @@
}else{
if (checkDelimiter(d)){
//Bag must have started now
- try{
- bag = BagFactory.getInstance().getNewBag();
- if (eliminateDuplicates)
- bag.distinct();
- else
- bag.sort(sortSpec);
-
- }catch(IOException e){
- throw new RuntimeException(e);
- }
+ if (eliminateDuplicates) {
+ bag = BagFactory.getInstance().newDistinctBag();
+ } else {
+ bag = BagFactory.getInstance().newSortedBag(sortSpec);
+ }
}else{
addToSuccessor(d);
}
Modified: incubator/pig/trunk/src/org/apache/pig/impl/eval/collector/DataCollector.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/collector/DataCollector.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/collector/DataCollector.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/collector/DataCollector.java Fri Jan 4 14:58:20 2008
@@ -84,7 +84,7 @@
DataBag bag = (DataBag)d;
//flatten the bag and send it through the pipeline
successor.add(DataBag.startBag);
- Iterator<Tuple> iter = bag.content();
+ Iterator<Tuple> iter = bag.iterator();
while(iter.hasNext())
successor.add(iter.next());
successor.add(DataBag.endBag);
Modified: incubator/pig/trunk/src/org/apache/pig/impl/eval/collector/UnflattenCollector.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/collector/UnflattenCollector.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/collector/UnflattenCollector.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/collector/UnflattenCollector.java Fri Jan 4 14:58:20 2008
@@ -47,11 +47,7 @@
}else{
if (checkDelimiter(d)){
//Bag must have started now
- try{
- bag = BagFactory.getInstance().getNewBag();
- }catch(IOException e){
- throw new RuntimeException(e);
- }
+ bag = BagFactory.getInstance().newDefaultBag();
}else{
successor.add(d);
}
Modified: incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileReader.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileReader.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileReader.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileReader.java Fri Jan 4 14:58:20 2008
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+/*
package org.apache.pig.impl.io;
import java.io.BufferedInputStream;
@@ -97,3 +98,4 @@
store.delete();
}
}
+*/
Modified: incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileWriter.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileWriter.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileWriter.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileWriter.java Fri Jan 4 14:58:20 2008
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+ /*
package org.apache.pig.impl.io;
import java.io.BufferedOutputStream;
@@ -66,3 +67,4 @@
}
}
+*/
Modified: incubator/pig/trunk/src/org/apache/pig/impl/io/PigFile.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/io/PigFile.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/io/PigFile.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/io/PigFile.java Fri Jan 4 14:58:20 2008
@@ -44,7 +44,7 @@
}
public DataBag load(LoadFunc lfunc, PigContext pigContext) throws IOException {
- DataBag content = BagFactory.getInstance().getNewBag();
+ DataBag content = BagFactory.getInstance().newDefaultBag();
InputStream is = FileLocalizer.open(file, pigContext);
lfunc.bindTo(file, new BufferedPositionedInputStream(is), 0, Long.MAX_VALUE);
Tuple f = null;
@@ -58,7 +58,7 @@
public void store(DataBag data, StoreFunc sfunc, PigContext pigContext) throws IOException {
BufferedOutputStream bos = new BufferedOutputStream(FileLocalizer.create(file, append, pigContext));
sfunc.bindTo(bos);
- for (Iterator<Tuple> it = data.content(); it.hasNext();) {
+ for (Iterator<Tuple> it = data.iterator(); it.hasNext();) {
Tuple row = it.next();
sfunc.putNext(row);
}
Modified: incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/MapReduceLauncher.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/MapReduceLauncher.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/MapReduceLauncher.java Fri Jan 4 14:58:20 2008
@@ -27,12 +27,14 @@
import org.apache.log4j.Logger;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.data.DataBag;
+import org.apache.pig.data.BagFactory;
import org.apache.pig.data.IndexedTuple;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.eval.EvalSpec;
import org.apache.pig.impl.io.PigFile;
import org.apache.pig.impl.physicalLayer.POMapreduce;
import org.apache.pig.impl.util.JarManager;
+import org.apache.pig.impl.util.PigLogger;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.hadoop.fs.Path;
@@ -99,7 +101,7 @@
*/
public boolean launchPig(POMapreduce pom) throws IOException {
- Logger log = pom.pigContext.getLogger();
+ Logger log = PigLogger.getLogger();
JobConf conf = new JobConf(pom.pigContext.getConf());
conf.setJobName(pom.pigContext.getJobName());
boolean success = false;
@@ -120,7 +122,7 @@
{
FileOutputStream fos = new FileOutputStream(submitJarFile);
JarManager.createJar(fos, funcs, pom.pigContext);
- System.out.println("Job jar size = " + submitJarFile.length());
+ log.debug("Job jar size = " + submitJarFile.length());
conf.setJar(submitJarFile.getPath());
String user = System.getProperty("user.name");
conf.setUser(user != null ? user : "Pigster");
@@ -228,7 +230,8 @@
// create an empty output file
PigFile f = new PigFile(outputFile.toString(), false);
- f.store(new DataBag(), new PigStorage(), pom.pigContext);
+ f.store(BagFactory.getInstance().newDefaultBag(),
+ new PigStorage(), pom.pigContext);
}
Modified: incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/PigCombine.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/PigCombine.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/PigCombine.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/PigCombine.java Fri Jan 4 14:58:20 2008
@@ -28,7 +28,7 @@
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.BigDataBag;
+import org.apache.pig.data.DataBag;
import org.apache.pig.data.Datum;
import org.apache.pig.data.IndexedTuple;
import org.apache.pig.data.Tuple;
@@ -47,17 +47,19 @@
private OutputCollector oc;
private int index;
private int inputCount;
- private BigDataBag bags[];
- private File tmpdir;
+ private DataBag bags[];
+ // private File tmpdir;
public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter)
throws IOException {
try {
+ /*
tmpdir = new File(job.get("mapred.task.id"));
tmpdir.mkdirs();
BagFactory.init(tmpdir);
+ */
PigContext pigContext = (PigContext) ObjectSerializer.deserialize(job.get("pig.pigContext"));
if (evalPipe == null) {
inputCount = ((ArrayList<FileSpec>)ObjectSerializer.deserialize(job.get("pig.inputs"))).size();
@@ -69,9 +71,9 @@
evalPipe = esp.setupPipe(finalout);
//throw new RuntimeException("combine spec: " + evalSpec + " combine pipe: " + esp.toString());
- bags = new BigDataBag[inputCount];
+ bags = new DataBag[inputCount];
for (int i = 0; i < inputCount; i++) {
- bags[i] = BagFactory.getInstance().getNewBigBag();
+ bags[i] = BagFactory.getInstance().newDefaultBag();
}
}
@@ -94,7 +96,7 @@
t.getBagField(it.index + 1).add(it.toTuple());
}
for (int i = 0; i < inputCount; i++) { // XXX: shouldn't we only do this if INNER flag is set?
- if (t.getBagField(1 + i).isEmpty())
+ if (t.getBagField(1 + i).size() == 0)
return;
}
// throw new RuntimeException("combine input: " + t.toString());
Modified: incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/PigMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/PigMapReduce.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/PigMapReduce.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/PigMapReduce.java Fri Jan 4 14:58:20 2008
@@ -87,7 +87,7 @@
private int index;
private int inputCount;
private boolean isInner[];
- private File tmpdir;
+ // private File tmpdir;
private static PigContext pigContext = null;
ArrayList<PigRecordWriter> sideFileWriters = new ArrayList<PigRecordWriter>();
@@ -100,9 +100,11 @@
PigMapReduce.reporter = reporter;
oc = output;
+ /*
tmpdir = new File(job.get("mapred.task.id"));
tmpdir.mkdirs();
BagFactory.init(tmpdir);
+ */
setupMapPipe(reporter);
@@ -125,10 +127,12 @@
PigMapReduce.reporter = reporter;
try {
+ /*
tmpdir = new File(job.get("mapred.task.id"));
tmpdir.mkdirs();
BagFactory.init(tmpdir);
+ */
oc = output;
if (evalPipe == null) {
@@ -140,7 +144,7 @@
Tuple t = new Tuple(1 + inputCount);
t.setField(0, groupName);
for (int i = 1; i < 1 + inputCount; i++) {
- bags[i - 1] = BagFactory.getInstance().getNewBag();
+ bags[i - 1] = BagFactory.getInstance().newDefaultBag();
t.setField(i, bags[i - 1]);
}
@@ -150,7 +154,7 @@
}
for (int i = 0; i < inputCount; i++) {
- if (isInner[i] && t.getBagField(1 + i).isEmpty())
+ if (isInner[i] && t.getBagField(1 + i).size() == 0)
return;
}
Modified: incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/IntermedResult.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/IntermedResult.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/IntermedResult.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/IntermedResult.java Fri Jan 4 14:58:20 2008
@@ -24,6 +24,7 @@
import org.apache.pig.PigServer.ExecType;
import org.apache.pig.builtin.BinStorage;
import org.apache.pig.data.DataBag;
+import org.apache.pig.data.BagFactory;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileSpec;
@@ -58,7 +59,7 @@
public IntermedResult() {
executed = true;
- databag = new DataBag();
+ databag = BagFactory.getInstance().newDefaultBag();
}
public IntermedResult(DataBag bag) {
Modified: incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POCogroup.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POCogroup.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POCogroup.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POCogroup.java Fri Jan 4 14:58:20 2008
@@ -121,7 +121,7 @@
boolean done = true;
for (int i = 0; i < inputs.length; i++) {
- DataBag b = BagFactory.getInstance().getNewBag();
+ DataBag b = BagFactory.getInstance().newDefaultBag();
while (sortedInputs[i].size() > 0) {
Datum g = sortedInputs[i].get(0)[0];
@@ -139,7 +139,7 @@
}
}
- if (specs.get(i).isInner() && b.isEmpty())
+ if (specs.get(i).isInner() && (b.size() == 0))
done = false; // this input uses "inner" semantics, and it has no tuples for
// this group, so suppress the tuple we're currently building
Modified: incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POMapreduce.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POMapreduce.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POMapreduce.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POMapreduce.java Fri Jan 4 14:58:20 2008
@@ -21,6 +21,8 @@
import java.util.ArrayList;
import java.util.Comparator;
+import org.apache.log4j.Logger;
+
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.pig.data.Tuple;
@@ -30,7 +32,7 @@
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.mapreduceExec.MapReduceLauncher;
import org.apache.pig.impl.util.ObjectSerializer;
-
+import org.apache.pig.impl.util.PigLogger;
public class POMapreduce extends PhysicalOperator {
private static final long serialVersionUID = 1L;
@@ -162,16 +164,16 @@
}
void print() {
- System.out.println("\n----- MapReduce Job -----");
- System.out.println("Input: " + inputFileSpecs);
- System.out.println("Map: " + toMap);
- System.out.println("Group: " + groupFuncs);
- System.out.println("Combine: " + toCombine);
- System.out.println("Reduce: " + toReduce);
- System.out.println("Output: " + outputFileSpec);
- System.out.println("Split: " + toSplit);
- System.out.println("Map parallelism: " + mapParallelism);
- System.out.println("Reduce parallelism: " + reduceParallelism);
+ Logger log = PigLogger.getLogger();
+ log.debug("Input: " + inputFileSpecs);
+ log.debug("Map: " + toMap);
+ log.debug("Group: " + groupFuncs);
+ log.debug("Combine: " + toCombine);
+ log.debug("Reduce: " + toReduce);
+ log.debug("Output: " + outputFileSpec);
+ log.debug("Split: " + toSplit);
+ log.debug("Map parallelism: " + mapParallelism);
+ log.debug("Reduce parallelism: " + reduceParallelism);
}
public POMapreduce copy(){
Modified: incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/PORead.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/PORead.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/PORead.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/PORead.java Fri Jan 4 14:58:20 2008
@@ -46,7 +46,7 @@
if (continueFromLast){
throw new RuntimeException("LOReads should not occur in continuous plans");
}
- it = bag.content();
+ it = bag.iterator();
return true;
}
Modified: incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POSort.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POSort.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POSort.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POSort.java Fri Jan 4 14:58:20 2008
@@ -43,14 +43,13 @@
public boolean open(boolean continueFromLast) throws IOException {
if (!super.open(continueFromLast))
return false;
- DataBag bag = BagFactory.getInstance().getNewBag();
+ DataBag bag = BagFactory.getInstance().newSortedBag(sortSpec);
- bag.sort(sortSpec);
Tuple t;
while((t = inputs[0].getNext())!=null){
bag.add(t);
}
- iter = bag.content();
+ iter = bag.iterator();
return true;
}
Modified: incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POStore.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POStore.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POStore.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POStore.java Fri Jan 4 14:58:20 2008
@@ -21,6 +21,7 @@
import org.apache.pig.StoreFunc;
import org.apache.pig.data.DataBag;
+import org.apache.pig.data.BagFactory;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileSpec;
@@ -62,7 +63,7 @@
@Override
public Tuple getNext() throws IOException {
// get all tuples from input, and store them.
- DataBag b = new DataBag();
+ DataBag b = BagFactory.getInstance().newDefaultBag();
Tuple t;
while ((t = (Tuple) inputs[0].getNext()) != null) {
b.add(t);
Modified: incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/PhysicalPlan.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/PhysicalPlan.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/PhysicalPlan.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/PhysicalPlan.java Fri Jan 4 14:58:20 2008
@@ -21,6 +21,7 @@
import java.util.Map;
import org.apache.pig.data.DataBag;
+import org.apache.pig.data.BagFactory;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
@@ -33,7 +34,7 @@
}
public DataBag exec(boolean continueFromLast) throws IOException {
- DataBag results = new DataBag();
+ DataBag results = BagFactory.getInstance().newDefaultBag();
root.open(continueFromLast);
Tuple t;
Added: incubator/pig/trunk/src/org/apache/pig/impl/util/PigLogger.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/util/PigLogger.java?rev=609048&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/util/PigLogger.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/util/PigLogger.java Fri Jan 4 14:58:20 2008
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.util;
+
+import org.apache.log4j.Logger;
+import org.apache.log4j.Level;
+import org.apache.log4j.ConsoleAppender;
+import org.apache.log4j.PatternLayout;
+
+public class PigLogger
+{
+
+private static Logger mLogger = null;
+private static boolean mHaveSetAppenders = false;
+
+/**
+ * Get an instance of the underlying log4j logger. This first makes sure
+ * the PigLogger is initialized and then returns the underlying logger.
+ */
+public static Logger getLogger()
+{
+ if (mLogger == null) {
+ mLogger = Logger.getLogger("org.apache.pig");
+ }
+ return mLogger;
+}
+
+/**
+ * Set up a log appender for the junit tests, this way they cn write out log
+ * messages.
+ */
+public static void setAppenderForJunit()
+{
+ if (!mHaveSetAppenders) {
+ Logger log = getLogger();
+ log.setLevel(Level.INFO);
+ ConsoleAppender screen = new ConsoleAppender(new PatternLayout());
+ screen.setThreshold(Level.INFO);
+ screen.setTarget(ConsoleAppender.SYSTEM_ERR);
+ log.addAppender(screen);
+ mHaveSetAppenders = true;
+ }
+}
+
+
+}
Added: incubator/pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java?rev=609048&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java Fri Jan 4 14:58:20 2008
@@ -0,0 +1,145 @@
+package org.apache.pig.impl.util;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryNotificationInfo;
+import java.lang.management.MemoryPoolMXBean;
+import java.lang.management.MemoryType;
+import java.lang.ref.WeakReference;
+import java.util.LinkedList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+import javax.management.Notification;
+import javax.management.NotificationEmitter;
+import javax.management.NotificationListener;
+import javax.management.openmbean.CompositeData;
+
+import org.apache.log4j.Logger;
+
+/**
+ * This class Tracks the tenured pool and a list of Spillable objects. When memory gets low, this
+ * class will start requesting Spillable objects to free up memory.
+ * <p>
+ * Low memory is defined as more than 50% of the tenured pool being allocated. Spillable objects are
+ * tracked using WeakReferences so that the objects can be GCed even though this class has a reference
+ * to them.
+ *
+ */
+public class SpillableMemoryManager implements NotificationListener {
+ List<WeakReference<Spillable>> spillables = new LinkedList<WeakReference<Spillable>>();
+
+ public SpillableMemoryManager() {
+ ((NotificationEmitter)ManagementFactory.getMemoryMXBean()).addNotificationListener(this, null, null);
+ List<MemoryPoolMXBean> mpbeans = ManagementFactory.getMemoryPoolMXBeans();
+ MemoryPoolMXBean biggestHeap = null;
+ long biggestSize = 0;
+ for (MemoryPoolMXBean b: mpbeans) {
+ PigLogger.getLogger().debug("Found heap (" + b.getName() +
+ ") of type " + b.getType());
+ if (b.getType() == MemoryType.HEAP) {
+ /* Here we are making the leap of faith that the biggest
+ * heap is the tenured heap
+ */
+ long size = b.getUsage().getMax();
+ if (size > biggestSize) {
+ biggestSize = size;
+ biggestHeap = b;
+ }
+ }
+ }
+ if (biggestHeap == null) {
+ throw new RuntimeException("Couldn't find heap");
+ }
+ PigLogger.getLogger().debug("Selected heap to monitor (" +
+ biggestHeap.getName() + ")");
+ /* We set the threshold to be 50% of tenured since that is where
+ * the GC starts to dominate CPU time according to Sun doc */
+ biggestHeap.setCollectionUsageThreshold((long)(biggestSize*.5));
+ }
+
+ public void handleNotification(Notification n, Object o) {
+ CompositeData cd = (CompositeData) n.getUserData();
+ MemoryNotificationInfo info = MemoryNotificationInfo.from(cd);
+ PigLogger.getLogger().info("low memory handler called " + info.getUsage());
+ long toFree = info.getUsage().getUsed() - (long)(info.getUsage().getMax()*.5);
+ if (toFree < 0) {
+ PigLogger.getLogger().debug("low memory handler returning " +
+ "because there is nothing to free");
+ return;
+ }
+ synchronized(spillables) {
+ // Walk the list first and remove nulls, otherwise the sort
+ // takes way too long.
+ Iterator<WeakReference<Spillable>> i;
+ for (i = spillables.iterator(); i.hasNext();) {
+ Spillable s = i.next().get();
+ if (s == null) {
+ i.remove();
+ }
+ }
+ Collections.sort(spillables, new Comparator<WeakReference<Spillable>>() {
+
+ /**
+ * We don't lock anything, so this sort may not be stable if a WeakReference suddenly
+ * becomes null, but it will be close enough.
+ */
+ @Override
+ public int compare(WeakReference<Spillable> o1Ref, WeakReference<Spillable> o2Ref) {
+ Spillable o1 = o1Ref.get();
+ Spillable o2 = o2Ref.get();
+ if (o1 == null && o2 == null) {
+ return 0;
+ }
+ if (o1 == null) {
+ return -1;
+ }
+ if (o2 == null) {
+ return 1;
+ }
+ long o1Size = o1.getMemorySize();
+ long o2Size = o2.getMemorySize();
+
+ if (o1Size == o2Size) {
+ return 0;
+ }
+ if (o1Size < o2Size) {
+ return -1;
+ }
+ return 1;
+ }
+ });
+ long estimatedFreed = 0;
+ for (i = spillables.iterator(); i.hasNext();) {
+ Spillable s = i.next().get();
+ // Still need to check for null here, even after we removed
+ // above, because the reference may have gone bad on us
+ // since the last check.
+ if (s == null) {
+ i.remove();
+ continue;
+ }
+ long toBeFreed = s.getMemorySize();
+ s.spill();
+ estimatedFreed += toBeFreed;
+ if (estimatedFreed > toFree) {
+ break;
+ }
+ }
+ /* Poke the GC again to see if we successfully freed enough memory */
+ System.gc();
+ }
+ }
+
+ /**
+ * Register a spillable to be tracked. No need to unregister, the tracking will stop
+ * when the spillable is GCed.
+ * @param s the spillable to track.
+ */
+ public void registerSpillable(Spillable s) {
+ synchronized(spillables) {
+ spillables.add(new WeakReference<Spillable>(s));
+ }
+ }
+}
Modified: incubator/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java Fri Jan 4 14:58:20 2008
@@ -98,6 +98,10 @@
protected void processDescribe(String alias) throws IOException {
mPigServer.dumpSchema(alias);
}
+
+ protected void processExplain(String alias) throws IOException {
+ mPigServer.explain(alias, System.out);
+ }
protected void processRegister(String jar) throws IOException {
mPigServer.registerJar(jar);
@@ -300,4 +304,4 @@
private JobClient mJobClient;
private boolean mDone;
-}
\ No newline at end of file
+}
Modified: incubator/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj (original)
+++ incubator/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj Fri Jan 4 14:58:20 2008
@@ -47,6 +47,8 @@
abstract protected void processRegisterFunc(String name, String expr);
abstract protected void processDescribe(String alias) throws IOException;
+
+ abstract protected void processExplain(String alias) throws IOException;
abstract protected void processRegister(String jar) throws IOException;
@@ -113,6 +115,7 @@
TOKEN: {<DEFINE: "define">}
TOKEN: {<DUMP: "dump">}
TOKEN: {<DESCRIBE: "describe">}
+TOKEN: {<EXPLAIN: "explain">}
TOKEN: {<HELP: "help">}
TOKEN: {<KILL: "kill">}
TOKEN: {<LS: "ls">}
@@ -301,6 +304,10 @@
t1 = <IDENTIFIER>
{processDescribe(t1.image);}
|
+ <EXPLAIN>
+ t1 = <IDENTIFIER>
+ {processExplain(t1.image);}
+ |
<HELP>
{printHelp();}
|
@@ -446,6 +453,8 @@
t = <DUMP>
|
t = <DESCRIBE>
+ |
+ t = <EXPLAIN>
|
t = <HELP>
|
Modified: incubator/pig/trunk/test/org/apache/pig/test/TestBuiltin.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestBuiltin.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/TestBuiltin.java (original)
+++ incubator/pig/trunk/test/org/apache/pig/test/TestBuiltin.java Fri Jan 4 14:58:20 2008
@@ -91,7 +91,7 @@
Tuple t3 = new Tuple(2);
t3.setField(0, 82.0);
t3.setField(1, 17);
- DataBag bag = new DataBag();
+ DataBag bag = BagFactory.getInstance().newDefaultBag();
bag.add(t1);
bag.add(t2);
bag.add(t3);
@@ -332,6 +332,7 @@
assertTrue(f3.arity() == arity3);
}
+ /*
@Test
public void testLFBin() throws Exception {
@@ -395,6 +396,7 @@
assertTrue(r1.equals(t1));
assertTrue(r2.equals(t5));
}
+ */
@Test
@@ -488,8 +490,8 @@
for (int i=0; i< numTimes; i++){
Tuple t = iter.next();
- assertEquals(i+"AA", t.getBagField(0).content().next().getAtomField(0).strval());
- assertEquals(i+"BB", t.getBagField(1).content().next().getAtomField(0).strval());
+ assertEquals(i+"AA", t.getBagField(0).iterator().next().getAtomField(0).strval());
+ assertEquals(i+"BB", t.getBagField(1).iterator().next().getAtomField(0).strval());
}
Added: incubator/pig/trunk/test/org/apache/pig/test/TestDataBag.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestDataBag.java?rev=609048&view=auto
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/TestDataBag.java (added)
+++ incubator/pig/trunk/test/org/apache/pig/test/TestDataBag.java Fri Jan 4 14:58:20 2008
@@ -0,0 +1,684 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+/*
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.Iterator;
+import java.util.Random;
+*/
+
+import java.util.*;
+import java.io.IOException;
+
+import org.junit.Test;
+
+import org.apache.pig.data.*;
+import org.apache.pig.impl.eval.*;
+import org.apache.pig.impl.util.Spillable;
+
+/**
+ * This class will exercise the basic Pig data model and members. It tests for proper behavior in
+ * assigment and comparision, as well as function application.
+ *
+ * @author dnm
+ */
+public class TestDataBag extends junit.framework.TestCase {
+
+ private Random rand = new Random();
+
+ private class TestMemoryManager {
+ ArrayList<Spillable> mManagedObjects = new ArrayList<Spillable>();
+
+ public void register(Spillable s) {
+ mManagedObjects.add(s);
+ }
+
+ public void forceSpill() throws IOException {
+ Iterator<Spillable> i = mManagedObjects.iterator();
+ while (i.hasNext()) i.next().spill();
+ }
+ }
+
+ // Need to override the regular bag factory so I can register with my local
+ // memory manager.
+ private class LocalBagFactory {
+ TestMemoryManager mMemMgr;
+
+ public LocalBagFactory(TestMemoryManager mgr) {
+ mMemMgr = mgr;
+ }
+
+ public DataBag newDefaultBag() {
+ DataBag bag = new DefaultDataBag();
+ mMemMgr.register(bag);
+ return bag;
+ }
+
+ public DataBag newSortedBag(EvalSpec sortSpec) {
+ DataBag bag = new SortedDataBag(sortSpec);
+ mMemMgr.register(bag);
+ return bag;
+ }
+
+ public DataBag newDistinctBag() {
+ DataBag bag = new DistinctDataBag();
+ mMemMgr.register(bag);
+ return bag;
+ }
+ }
+
+ // Test reading and writing default from memory, no spills.
+ @Test
+ public void testDefaultInMemory() throws Exception {
+ TestMemoryManager mgr = new TestMemoryManager();
+ LocalBagFactory factory = new LocalBagFactory(mgr);
+ DataBag b = factory.newDefaultBag();
+ ArrayList<Tuple> rightAnswer = new ArrayList<Tuple>(10);
+
+ // Write tuples into both
+ for (int i = 0; i < 10; i++) {
+ Tuple t = new Tuple(new DataAtom(i));
+ b.add(t);
+ rightAnswer.add(t);
+ }
+
+ // Read tuples back, hopefully they come out in the same order.
+ Iterator<Tuple> bIter = b.iterator();
+ Iterator<Tuple> rIter = rightAnswer.iterator();
+
+ while (rIter.hasNext()) {
+ assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+ assertEquals("tuples should be the same", bIter.next(), rIter.next());
+ }
+
+ assertFalse("right answer ran out of tuples before the bag",
+ bIter.hasNext());
+ }
+
+ // Test reading and writing default from file with one spill
+ @Test
+ public void testDefaultSingleSpill() throws Exception {
+ TestMemoryManager mgr = new TestMemoryManager();
+ LocalBagFactory factory = new LocalBagFactory(mgr);
+ DataBag b = factory.newDefaultBag();
+ ArrayList<Tuple> rightAnswer = new ArrayList<Tuple>(10);
+
+ // Write tuples into both
+ for (int i = 0; i < 10; i++) {
+ Tuple t = new Tuple(new DataAtom(i));
+ b.add(t);
+ rightAnswer.add(t);
+ }
+ mgr.forceSpill();
+
+ // Read tuples back, hopefully they come out in the same order.
+ Iterator<Tuple> bIter = b.iterator();
+ Iterator<Tuple> rIter = rightAnswer.iterator();
+
+ while (rIter.hasNext()) {
+ assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+ assertEquals("tuples should be the same", bIter.next(), rIter.next());
+ }
+
+ assertFalse("right answer ran out of tuples before the bag",
+ bIter.hasNext());
+ }
+
+ // Test reading and writing default from file with three spills
+ @Test
+ public void testDefaultTripleSpill() throws Exception {
+ TestMemoryManager mgr = new TestMemoryManager();
+ LocalBagFactory factory = new LocalBagFactory(mgr);
+ DataBag b = factory.newDefaultBag();
+ ArrayList<Tuple> rightAnswer = new ArrayList<Tuple>(30);
+
+ // Write tuples into both
+ for (int j = 0; j < 3; j++) {
+ for (int i = 0; i < 10; i++) {
+ Tuple t = new Tuple(new DataAtom(i));
+ b.add(t);
+ rightAnswer.add(t);
+ }
+ mgr.forceSpill();
+ }
+
+ // Read tuples back, hopefully they come out in the same order.
+ Iterator<Tuple> bIter = b.iterator();
+ Iterator<Tuple> rIter = rightAnswer.iterator();
+
+ while (rIter.hasNext()) {
+ assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+ assertEquals("tuples should be the same", bIter.next(), rIter.next());
+ }
+
+ assertFalse("right answer ran out of tuples before the bag",
+ bIter.hasNext());
+ }
+
+ // Test reading with some in file, some in memory.
+ @Test
+ public void testDefaultInMemInFile() throws Exception {
+ TestMemoryManager mgr = new TestMemoryManager();
+ LocalBagFactory factory = new LocalBagFactory(mgr);
+ DataBag b = factory.newDefaultBag();
+ ArrayList<Tuple> rightAnswer = new ArrayList<Tuple>(20);
+
+ // Write tuples into both
+ for (int i = 0; i < 10; i++) {
+ Tuple t = new Tuple(new DataAtom(i));
+ b.add(t);
+ rightAnswer.add(t);
+ }
+ mgr.forceSpill();
+
+ for (int i = 0; i < 10; i++) {
+ Tuple t = new Tuple(new DataAtom(i));
+ b.add(t);
+ rightAnswer.add(t);
+ }
+
+ // Read tuples back, hopefully they come out in the same order.
+ Iterator<Tuple> bIter = b.iterator();
+ Iterator<Tuple> rIter = rightAnswer.iterator();
+
+ while (rIter.hasNext()) {
+ assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+ assertEquals("tuples should be the same", bIter.next(), rIter.next());
+ }
+
+ assertFalse("right answer ran out of tuples before the bag",
+ bIter.hasNext());
+ }
+
+ // Test reading with a spill happening in the middle of the read.
+ @Test
+ public void testDefaultSpillDuringRead() throws Exception {
+ TestMemoryManager mgr = new TestMemoryManager();
+ LocalBagFactory factory = new LocalBagFactory(mgr);
+ DataBag b = factory.newDefaultBag();
+ ArrayList<Tuple> rightAnswer = new ArrayList<Tuple>(20);
+
+ // Write tuples into both
+ for (int i = 0; i < 10; i++) {
+ Tuple t = new Tuple(new DataAtom(i));
+ b.add(t);
+ rightAnswer.add(t);
+ }
+ mgr.forceSpill();
+
+ for (int i = 0; i < 10; i++) {
+ Tuple t = new Tuple(new DataAtom(i));
+ b.add(t);
+ rightAnswer.add(t);
+ }
+
+ // Read tuples back, hopefully they come out in the same order.
+ Iterator<Tuple> bIter = b.iterator();
+ Iterator<Tuple> rIter = rightAnswer.iterator();
+
+ for (int i = 0; i < 15; i++) {
+ assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+ assertEquals("tuples should be the same", bIter.next(), rIter.next());
+ }
+
+ mgr.forceSpill();
+
+ while (rIter.hasNext()) {
+ assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+ assertEquals("tuples should be the same", bIter.next(), rIter.next());
+ }
+
+ assertFalse("right answer ran out of tuples before the bag",
+ bIter.hasNext());
+ }
+
+ // Test reading and writing sorted from memory, no spills.
+ @Test
+ public void testSortedInMemory() throws Exception {
+ TestMemoryManager mgr = new TestMemoryManager();
+ LocalBagFactory factory = new LocalBagFactory(mgr);
+ DataBag b = factory.newSortedBag(null);
+ PriorityQueue<Tuple> rightAnswer = new PriorityQueue<Tuple>(10);
+
+ // Write tuples into both
+ for (int i = 0; i < 10; i++) {
+ Tuple t = new Tuple(new DataAtom(rand.nextInt()));
+ b.add(t);
+ rightAnswer.add(t);
+ }
+
+ // Read tuples back, hopefully they come out in the same order.
+ Iterator<Tuple> bIter = b.iterator();
+
+ Tuple t;
+ while ((t = rightAnswer.poll()) != null) {
+ assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+ assertEquals("tuples should be the same", bIter.next(), t);
+ }
+
+ assertFalse("right answer ran out of tuples before the bag",
+ bIter.hasNext());
+ }
+
+ // Test reading and writing default from file with one spill
+ @Test
+ public void testSortedSingleSpill() throws Exception {
+ TestMemoryManager mgr = new TestMemoryManager();
+ LocalBagFactory factory = new LocalBagFactory(mgr);
+ DataBag b = factory.newSortedBag(null);
+ PriorityQueue<Tuple> rightAnswer = new PriorityQueue<Tuple>(10);
+
+ // Write tuples into both
+ for (int i = 0; i < 10; i++) {
+ Tuple t = new Tuple(new DataAtom(rand.nextInt()));
+ b.add(t);
+ rightAnswer.add(t);
+ }
+ mgr.forceSpill();
+
+ // Read tuples back, hopefully they come out in the same order.
+ Iterator<Tuple> bIter = b.iterator();
+ Tuple t;
+ while ((t = rightAnswer.poll()) != null) {
+ assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+ assertEquals("tuples should be the same", bIter.next(), t);
+ }
+
+ assertFalse("right answer ran out of tuples before the bag",
+ bIter.hasNext());
+ }
+
+ // Test reading and writing default from file with three spills
+ @Test
+ public void testSortedTripleSpill() throws Exception {
+ TestMemoryManager mgr = new TestMemoryManager();
+ LocalBagFactory factory = new LocalBagFactory(mgr);
+ DataBag b = factory.newSortedBag(null);
+ PriorityQueue<Tuple> rightAnswer = new PriorityQueue<Tuple>(30);
+
+ // Write tuples into both
+ for (int j = 0; j < 3; j++) {
+ for (int i = 0; i < 10; i++) {
+ Tuple t = new Tuple(new DataAtom(rand.nextInt()));
+ b.add(t);
+ rightAnswer.add(t);
+ }
+ mgr.forceSpill();
+ }
+
+ // Read tuples back, hopefully they come out in the same order.
+ Iterator<Tuple> bIter = b.iterator();
+
+ Tuple t;
+ while ((t = rightAnswer.poll()) != null) {
+ assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+ assertEquals("tuples should be the same", bIter.next(), t);
+ }
+
+ assertFalse("right answer ran out of tuples before the bag",
+ bIter.hasNext());
+ }
+
+ // Test reading with some in file, some in memory.
+ @Test
+ public void testSortedInMemInFile() throws Exception {
+ TestMemoryManager mgr = new TestMemoryManager();
+ LocalBagFactory factory = new LocalBagFactory(mgr);
+ DataBag b = factory.newSortedBag(null);
+ PriorityQueue<Tuple> rightAnswer = new PriorityQueue<Tuple>(20);
+
+ // Write tuples into both
+ for (int i = 0; i < 10; i++) {
+ Tuple t = new Tuple(new DataAtom(rand.nextInt()));
+ b.add(t);
+ rightAnswer.add(t);
+ }
+ mgr.forceSpill();
+
+ for (int i = 0; i < 10; i++) {
+ Tuple t = new Tuple(new DataAtom(rand.nextInt()));
+ b.add(t);
+ rightAnswer.add(t);
+ }
+
+ // Read tuples back, hopefully they come out in the same order.
+ Iterator<Tuple> bIter = b.iterator();
+ Tuple t;
+ while ((t = rightAnswer.poll()) != null) {
+ assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+ assertEquals("tuples should be the same", bIter.next(), t);
+ }
+
+ assertFalse("right answer ran out of tuples before the bag",
+ bIter.hasNext());
+ }
+
+ // Test reading with a spill happening in the middle of the read.
+ @Test
+ public void testSortedSpillDuringRead() throws Exception {
+ TestMemoryManager mgr = new TestMemoryManager();
+ LocalBagFactory factory = new LocalBagFactory(mgr);
+ DataBag b = factory.newSortedBag(null);
+ PriorityQueue<Tuple> rightAnswer = new PriorityQueue<Tuple>(20);
+
+ // Write tuples into both
+ for (int i = 0; i < 10; i++) {
+ Tuple t = new Tuple(new DataAtom(rand.nextInt()));
+ b.add(t);
+ rightAnswer.add(t);
+ }
+ mgr.forceSpill();
+
+ for (int i = 0; i < 10; i++) {
+ Tuple t = new Tuple(new DataAtom(rand.nextInt()));
+ b.add(t);
+ rightAnswer.add(t);
+ }
+
+ // Read tuples back, hopefully they come out in the same order.
+ Iterator<Tuple> bIter = b.iterator();
+
+ for (int i = 0; i < 15; i++) {
+ assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+ assertEquals("tuples should be the same", bIter.next(), rightAnswer.poll());
+ }
+
+ mgr.forceSpill();
+
+ Tuple t;
+ while ((t = rightAnswer.poll()) != null) {
+ assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+ assertEquals("tuples should be the same", bIter.next(), t);
+ }
+
+ assertFalse("right answer ran out of tuples before the bag",
+ bIter.hasNext());
+ }
+
+ // Test reading with first spill happening in the middle of the read.
+ @Test
+ public void testSortedFirstSpillDuringRead() throws Exception {
+ TestMemoryManager mgr = new TestMemoryManager();
+ LocalBagFactory factory = new LocalBagFactory(mgr);
+ DataBag b = factory.newSortedBag(null);
+ PriorityQueue<Tuple> rightAnswer = new PriorityQueue<Tuple>(20);
+
+ for (int i = 0; i < 10; i++) {
+ Tuple t = new Tuple(new DataAtom(rand.nextInt()));
+ b.add(t);
+ rightAnswer.add(t);
+ }
+
+ // Read tuples back, hopefully they come out in the same order.
+ Iterator<Tuple> bIter = b.iterator();
+
+ for (int i = 0; i < 5; i++) {
+ assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+ assertEquals("tuples should be the same", bIter.next(), rightAnswer.poll());
+ }
+
+ mgr.forceSpill();
+
+ Tuple t;
+ while ((t = rightAnswer.poll()) != null) {
+ assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+ assertEquals("tuples should be the same", bIter.next(), t);
+ }
+
+ assertFalse("right answer ran out of tuples before the bag",
+ bIter.hasNext());
+ }
+
+ // Test reading and writing sorted file with so many spills it requires
+ // premerge.
+ @Test
+ public void testSortedPreMerge() throws Exception {
+ TestMemoryManager mgr = new TestMemoryManager();
+ LocalBagFactory factory = new LocalBagFactory(mgr);
+ DataBag b = factory.newSortedBag(null);
+ PriorityQueue<Tuple> rightAnswer = new PriorityQueue<Tuple>(30);
+
+ // Write tuples into both
+ for (int j = 0; j < 373; j++) {
+ for (int i = 0; i < 10; i++) {
+ Tuple t = new Tuple(new DataAtom(rand.nextInt()));
+ b.add(t);
+ rightAnswer.add(t);
+ }
+ mgr.forceSpill();
+ }
+
+ // Read tuples back, hopefully they come out in the same order.
+ Iterator<Tuple> bIter = b.iterator();
+
+ Tuple t;
+ while ((t = rightAnswer.poll()) != null) {
+ assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+ assertEquals("tuples should be the same", bIter.next(), t);
+ }
+
+ assertFalse("right answer ran out of tuples before the bag",
+ bIter.hasNext());
+ }
+
+ // Test reading and writing distinct from memory, no spills.
+ @Test
+ public void testDistinctInMemory() throws Exception {
+ TestMemoryManager mgr = new TestMemoryManager();
+ LocalBagFactory factory = new LocalBagFactory(mgr);
+ DataBag b = factory.newDistinctBag();
+ TreeSet<Tuple> rightAnswer = new TreeSet<Tuple>();
+
+ // Write tuples into both
+ for (int i = 0; i < 50; i++) {
+ Tuple t = new Tuple(new DataAtom(rand.nextInt() % 5));
+ b.add(t);
+ rightAnswer.add(t);
+ }
+
+ // Read tuples back, hopefully they come out in the same order.
+ Iterator<Tuple> bIter = b.iterator();
+ Iterator<Tuple> rIter = rightAnswer.iterator();
+
+ while (rIter.hasNext()) {
+ assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+ assertEquals("tuples should be the same", bIter.next(), rIter.next());
+ }
+
+ assertFalse("right answer ran out of tuples before the bag",
+ bIter.hasNext());
+ }
+
+ // Test reading and writing distinct from file with one spill
+ @Test
+ public void testDistinctSingleSpill() throws Exception {
+ TestMemoryManager mgr = new TestMemoryManager();
+ LocalBagFactory factory = new LocalBagFactory(mgr);
+ DataBag b = factory.newDistinctBag();
+ TreeSet<Tuple> rightAnswer = new TreeSet<Tuple>();
+
+ // Write tuples into both
+ for (int i = 0; i < 50; i++) {
+ Tuple t = new Tuple(new DataAtom(rand.nextInt() % 5));
+ b.add(t);
+ rightAnswer.add(t);
+ }
+ mgr.forceSpill();
+
+ // Read tuples back, hopefully they come out in the same order.
+ Iterator<Tuple> bIter = b.iterator();
+ Iterator<Tuple> rIter = rightAnswer.iterator();
+
+ while (rIter.hasNext()) {
+ assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+ assertEquals("tuples should be the same", bIter.next(), rIter.next());
+ }
+
+ assertFalse("right answer ran out of tuples before the bag",
+ bIter.hasNext());
+ }
+
+ // Test reading and writing distinct from file with three spills
+ @Test
+ public void testDistinctTripleSpill() throws Exception {
+ TestMemoryManager mgr = new TestMemoryManager();
+ LocalBagFactory factory = new LocalBagFactory(mgr);
+ DataBag b = factory.newDistinctBag();
+ TreeSet<Tuple> rightAnswer = new TreeSet<Tuple>();
+
+ // Write tuples into both
+ for (int j = 0; j < 3; j++) {
+ for (int i = 0; i < 50; i++) {
+ Tuple t = new Tuple(new DataAtom(rand.nextInt() % 5));
+ b.add(t);
+ rightAnswer.add(t);
+ }
+ mgr.forceSpill();
+ }
+
+ // Read tuples back, hopefully they come out in the same order.
+ Iterator<Tuple> bIter = b.iterator();
+ Iterator<Tuple> rIter = rightAnswer.iterator();
+
+ while (rIter.hasNext()) {
+ assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+ assertEquals("tuples should be the same", bIter.next(), rIter.next());
+ }
+
+ assertFalse("right answer ran out of tuples before the bag",
+ bIter.hasNext());
+ }
+
+ // Test reading with some in file, some in memory.
+ @Test
+ public void testDistinctInMemInFile() throws Exception {
+ TestMemoryManager mgr = new TestMemoryManager();
+ LocalBagFactory factory = new LocalBagFactory(mgr);
+ DataBag b = factory.newDistinctBag();
+ TreeSet<Tuple> rightAnswer = new TreeSet<Tuple>();
+
+ // Write tuples into both
+ for (int i = 0; i < 50; i++) {
+ Tuple t = new Tuple(new DataAtom(rand.nextInt() % 5));
+ b.add(t);
+ rightAnswer.add(t);
+ }
+ mgr.forceSpill();
+
+ for (int i = 0; i < 50; i++) {
+ Tuple t = new Tuple(new DataAtom(i));
+ b.add(t);
+ rightAnswer.add(t);
+ }
+
+ // Read tuples back, hopefully they come out in the same order.
+ Iterator<Tuple> bIter = b.iterator();
+ Iterator<Tuple> rIter = rightAnswer.iterator();
+
+ while (rIter.hasNext()) {
+ assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+ assertEquals("tuples should be the same", bIter.next(), rIter.next());
+ }
+
+ assertFalse("right answer ran out of tuples before the bag",
+ bIter.hasNext());
+ }
+
+ // Test reading with a spill happening in the middle of the read.
+ @Test
+ public void testDistinctSpillDuringRead() throws Exception {
+ TestMemoryManager mgr = new TestMemoryManager();
+ LocalBagFactory factory = new LocalBagFactory(mgr);
+ DataBag b = factory.newDistinctBag();
+ TreeSet<Tuple> rightAnswer = new TreeSet<Tuple>();
+
+ // Write tuples into both
+ for (int i = 0; i < 50; i++) {
+ Tuple t = new Tuple(new DataAtom(rand.nextInt() % 5));
+ b.add(t);
+ rightAnswer.add(t);
+ }
+ mgr.forceSpill();
+
+ for (int i = 0; i < 50; i++) {
+ Tuple t = new Tuple(new DataAtom(i));
+ b.add(t);
+ rightAnswer.add(t);
+ }
+
+ // Read tuples back, hopefully they come out in the same order.
+ Iterator<Tuple> bIter = b.iterator();
+ Iterator<Tuple> rIter = rightAnswer.iterator();
+
+ for (int i = 0; i < 5; i++) {
+ assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+ assertEquals("tuples should be the same", bIter.next(), rIter.next());
+ }
+
+ mgr.forceSpill();
+
+ while (rIter.hasNext()) {
+ assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+ assertEquals("tuples should be the same", bIter.next(), rIter.next());
+ }
+
+ assertFalse("right answer ran out of tuples before the bag",
+ bIter.hasNext());
+ }
+
+ // Test reading and writing distinct from file with enough spills to
+ // force a pre-merge
+ @Test
+ public void testDistinctPreMerge() throws Exception {
+ TestMemoryManager mgr = new TestMemoryManager();
+ LocalBagFactory factory = new LocalBagFactory(mgr);
+ DataBag b = factory.newDistinctBag();
+ TreeSet<Tuple> rightAnswer = new TreeSet<Tuple>();
+
+ // Write tuples into both
+ for (int j = 0; j < 321; j++) {
+ for (int i = 0; i < 50; i++) {
+ Tuple t = new Tuple(new DataAtom(rand.nextInt() % 5));
+ b.add(t);
+ rightAnswer.add(t);
+ }
+ mgr.forceSpill();
+ }
+
+ // Read tuples back, hopefully they come out in the same order.
+ Iterator<Tuple> bIter = b.iterator();
+ Iterator<Tuple> rIter = rightAnswer.iterator();
+
+ while (rIter.hasNext()) {
+ assertTrue("bag ran out of tuples before answer", bIter.hasNext());
+ assertEquals("tuples should be the same", bIter.next(), rIter.next());
+ }
+
+ assertFalse("right answer ran out of tuples before the bag",
+ bIter.hasNext());
+ }
+
+}
+
+
+
Modified: incubator/pig/trunk/test/org/apache/pig/test/TestDataModel.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestDataModel.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/TestDataModel.java (original)
+++ incubator/pig/trunk/test/org/apache/pig/test/TestDataModel.java Fri Jan 4 14:58:20 2008
@@ -159,6 +159,7 @@
assertTrue(n1.arity() == n1Arity + n2Arity);
}
+ /*
@Test
public void testDataBag() throws Exception {
int[] input1 = { 1, 2, 3, 4, 5 };
@@ -217,6 +218,7 @@
Runtime.getRuntime().gc();
testBigDataBag(Runtime.getRuntime().maxMemory() - 1*1024*1024, 1000000);
}
+ */
private enum TestType {
PRE_SORT,
@@ -227,6 +229,7 @@
}
+ /*
private void testBigDataBag(long freeMemoryToMaintain, int numItems) throws Exception {
BigDataBag.FREE_MEMORY_TO_MAINTAIN = freeMemoryToMaintain;
Random r = new Random();
@@ -288,5 +291,6 @@
if (testType != TestType.NONE)
assertTrue(bag.numNotifies >= count/DataBag.notifyInterval);
}
+ */
}
Modified: incubator/pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java (original)
+++ incubator/pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java Fri Jan 4 14:58:20 2008
@@ -37,9 +37,7 @@
import org.apache.pig.builtin.BinStorage;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.builtin.TextLoader;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataMap;
-import org.apache.pig.data.Tuple;
+import org.apache.pig.data.*;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.PigFile;
@@ -134,7 +132,7 @@
@Test
public void testMapLookup() throws IOException{
PigServer pigServer = new PigServer(initString);
- DataBag b = new DataBag();
+ DataBag b = BagFactory.getInstance().newDefaultBag();
DataMap colors = new DataMap();
colors.put("apple","red");
colors.put("orange","orange");
Modified: incubator/pig/trunk/test/org/apache/pig/test/TestMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestMapReduce.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/TestMapReduce.java (original)
+++ incubator/pig/trunk/test/org/apache/pig/test/TestMapReduce.java Fri Jan 4 14:58:20 2008
@@ -78,7 +78,7 @@
}
@Override
public void exec(Tuple input, DataBag output) throws IOException {
- Iterator<Tuple> it = (input.getBagField(0)).content();
+ Iterator<Tuple> it = (input.getBagField(0)).iterator();
while(it.hasNext()) {
Tuple t = it.next();
Tuple newT = new Tuple(2);
Modified: incubator/pig/trunk/test/org/apache/pig/test/TestPigFile.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestPigFile.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/TestPigFile.java (original)
+++ incubator/pig/trunk/test/org/apache/pig/test/TestPigFile.java Fri Jan 4 14:58:20 2008
@@ -33,18 +33,14 @@
import org.apache.pig.PigServer;
import org.apache.pig.builtin.BinStorage;
import org.apache.pig.builtin.PigStorage;
-import org.apache.pig.data.DataAtom;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataMap;
-import org.apache.pig.data.Datum;
-import org.apache.pig.data.Tuple;
+import org.apache.pig.data.*;
import org.apache.pig.PigServer.ExecType;
import org.apache.pig.impl.io.PigFile;
import org.apache.pig.impl.PigContext;
public class TestPigFile extends TestCase {
- DataBag bag = new DataBag();
+ DataBag bag = BagFactory.getInstance().newDefaultBag();
Random rand = new Random();
@Override
@@ -89,10 +85,10 @@
DataBag loaded = load.load(new PigStorage(), pigContext);
System.out.println("Done.");
- assertTrue(bag.cardinality() == loaded.cardinality());
+ assertTrue(bag.size() == loaded.size());
- Iterator<Tuple> it1 = bag.content();
- Iterator<Tuple> it2 = loaded.content();
+ Iterator<Tuple> it1 = bag.iterator();
+ Iterator<Tuple> it2 = loaded.iterator();
while (it1.hasNext() && it2.hasNext()) {
Tuple f1 = it1.next();
Tuple f2 = it2.next();
@@ -131,7 +127,7 @@
private DataBag getRandomBag(int maxCardinality, int nestingLevel) throws IOException{
int cardinality = rand.nextInt(maxCardinality)+1;
- DataBag b = new DataBag();
+ DataBag b = BagFactory.getInstance().newDefaultBag();
for (int i=0; i<cardinality; i++){
Tuple t = getRandomTuple(nestingLevel+1);
b.add(t);
@@ -168,10 +164,10 @@
DataBag loaded = load.load(new BinStorage(), pigContext);
System.out.println("Done.");
- assertTrue(bag.cardinality() == loaded.cardinality());
+ assertTrue(bag.size() == loaded.size());
- Iterator<Tuple> it1 = bag.content();
- Iterator<Tuple> it2 = loaded.content();
+ Iterator<Tuple> it1 = bag.iterator();
+ Iterator<Tuple> it2 = loaded.iterator();
while (it1.hasNext() && it2.hasNext()) {
Tuple f1 = it1.next();
Tuple f2 = it2.next();
Modified: incubator/pig/trunk/test/org/apache/pig/test/Util.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/Util.java?rev=609048&r1=609047&r2=609048&view=diff
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/Util.java (original)
+++ incubator/pig/trunk/test/org/apache/pig/test/Util.java Fri Jan 4 14:58:20 2008
@@ -19,8 +19,7 @@
import java.io.IOException;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.Tuple;
+import org.apache.pig.data.*;
public class Util {
// Helper Functions
@@ -40,7 +39,7 @@
}
static public Tuple loadNestTuple(Tuple t, int[] input) throws IOException {
- DataBag bag = new DataBag();
+ DataBag bag = BagFactory.getInstance().newDefaultBag();
for(int i = 0; i < input.length; i++) {
Tuple f = new Tuple(1);
f.setField(0, input[i]);
@@ -52,7 +51,7 @@
static public Tuple loadNestTuple(Tuple t, int[][] input) throws IOException {
for (int i = 0; i < input.length; i++) {
- DataBag bag = new DataBag();
+ DataBag bag = BagFactory.getInstance().newDefaultBag();
Tuple f = loadFlatTuple(new Tuple(input[i].length), input[i]);
bag.add(f);
t.setField(i, bag);
@@ -62,7 +61,7 @@
static public Tuple loadTuple(Tuple t, String[][] input) throws IOException {
for (int i = 0; i < input.length; i++) {
- DataBag bag = new DataBag();
+ DataBag bag = BagFactory.getInstance().newDefaultBag();
Tuple f = loadTuple(new Tuple(input[i].length), input[i]);
bag.add(f);
t.setField(i, bag);