You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/04/07 23:14:52 UTC
svn commit: r645697 - in /incubator/pig/branches/types: ./
src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/
src/org/apache/pig/impl/ src/org/apache/pig/impl/io/
src/org/apache/pig/impl/physicalLayer/plans/ src/org/apache/pig/impl/...
Author: gates
Date: Mon Apr 7 14:14:48 2008
New Revision: 645697
URL: http://svn.apache.org/viewvc?rev=645697&view=rev
Log:
Shravan's incr3 patch, with new POLoad and POStore operators.
Added:
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POLoad.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POStore.java
incubator/pig/branches/types/test/org/apache/pig/test/TestLoad.java
incubator/pig/branches/types/test/org/apache/pig/test/TestStoreOld.java
incubator/pig/branches/types/test/org/apache/pig/test/utils/TestHelper.java
Modified:
incubator/pig/branches/types/build.xml
incubator/pig/branches/types/src/org/apache/pig/PigServer.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
incubator/pig/branches/types/src/org/apache/pig/impl/PigContext.java
incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java
incubator/pig/branches/types/src/org/apache/pig/impl/util/JarManager.java
incubator/pig/branches/types/test/org/apache/pig/test/TestFilter.java
incubator/pig/branches/types/test/org/apache/pig/test/TestProject.java
incubator/pig/branches/types/test/org/apache/pig/test/TestStore.java
incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java
Modified: incubator/pig/branches/types/build.xml
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/build.xml?rev=645697&r1=645696&r2=645697&view=diff
==============================================================================
--- incubator/pig/branches/types/build.xml (original)
+++ incubator/pig/branches/types/build.xml Mon Apr 7 14:14:48 2008
@@ -134,13 +134,16 @@
<target name="compile-sources">
<javac encoding="${build.encoding}" srcdir="${sources}"
includes="**/plan/*.java, **/data/*.java, **/pig/builtin/*.java,
- **/test/TestOperatorPlan.java, **/test/TestBuiltin.java,
+ **/test/utils/*.java, **/test/TestOperatorPlan.java, **/test/TestBuiltin.java,
**/test/TestConstExpr.java, **/test/TestFilter.java, **/test/TestPhyOp.java,
**/test/TestAdd.java, **/test/TestSubtract.java, **/test/TestMultiply.java,
**/test/TestDivide.java, **/test/TestMod.java, **/test/TestGreaterThan.java,
**/test/TestGTOrEqual.java,**/test/TestLessThan.java,**/test/TestLTOrEqual.java,
**/test/TestEqualTo.java,**/test/TestNotEqualTo.java, **/test/TestPOGenerate.java,
- **/test/TestProject.java, **/test/utils/*.java, **/logicalLayer/*.java,
+ **/test/TestProject.java, **/test/TestLoad.java, **/test/TestStore.java,
+ **/test/FakeFSOutputStream.java,
+ **/test/FakeFSInputStream.java, **/test/Util.java,
+ **/logicalLayer/*.java,
**/logicalLayer/schema/*.java, **/physicalLayer/topLevelOperators/*.java,
**/physicalLayer/topLevelOperator/**/*.java, **/physicalLayer/plans/*.java,
**/physicalLayer/Result.java, **/physicalLayer/POStatus.java"
@@ -244,6 +247,8 @@
<include name="**/TestEqualTo.java" />
<include name="**/TestNotEqualTo.java" />
<include name="**/TestPOGenerate.java" />
+ <include name="**/TestLoad.java" />
+ <include name="**/TestStore.java" />
<!--
<include name="**/*Test*.java" />
<exclude name="**/TestLargeFile.java" />
Modified: incubator/pig/branches/types/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/PigServer.java?rev=645697&r1=645696&r2=645697&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/PigServer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/PigServer.java Mon Apr 7 14:14:48 2008
@@ -34,23 +34,20 @@
import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.datastorage.ContainerDescriptor;
import org.apache.pig.backend.datastorage.DataStorage;
-import org.apache.pig.backend.datastorage.DataStorageException;
import org.apache.pig.backend.datastorage.ElementDescriptor;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.executionengine.ExecJob;
import org.apache.pig.backend.executionengine.ExecPhysicalPlan;
import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
-import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.MapReduceLauncher;
+//TODO FIX Need to uncomment this with the right imports
+//import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.MapReduceLauncher;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
-import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
import org.apache.pig.impl.logicalLayer.OperatorKey;
-import org.apache.pig.impl.logicalLayer.parser.ParseException;
-import org.apache.pig.impl.logicalLayer.parser.QueryParser;
import org.apache.pig.impl.util.WrappedIOException;
@@ -233,11 +230,12 @@
// TODO FIX Need to change so that only syntax parsing is done here, and so that logical plan is additive.
// parse the query into a logical plan
LogicalPlan lp = null;
- try {
+// TODO FIX Need to uncomment this with the right logic
+ /*try {
lp = (new LogicalPlanBuilder(pigContext).parse(scope, query, aliases, opTable));
} catch (ParseException e) {
throw (IOException) new IOException(e.getMessage()).initCause(e);
- }
+ }*/
/*
if (lp.getAlias() != null) {
@@ -469,7 +467,9 @@
}
public long totalHadoopTimeSpent() {
- return MapReduceLauncher.totalHadoopTimeSpent;
+// TODO FIX Need to uncomment this with the right logic
+// return MapReduceLauncher.totalHadoopTimeSpent;
+ return 0L;
}
public Map<String, LogicalPlan> getAliases() {
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=645697&r1=645696&r2=645697&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Mon Apr 7 14:14:48 2008
@@ -33,28 +33,20 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobSubmissionProtocol;
import org.apache.hadoop.mapred.JobTracker;
-
import org.apache.pig.backend.datastorage.DataStorage;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.executionengine.ExecJob;
import org.apache.pig.backend.executionengine.ExecPhysicalOperator;
import org.apache.pig.backend.executionengine.ExecPhysicalPlan;
import org.apache.pig.backend.executionengine.ExecutionEngine;
-import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
import org.apache.pig.backend.hadoop.datastorage.HConfiguration;
import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
-import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.MapReduceLauncher;
-import org.apache.pig.builtin.BinStorage;
-import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.FileLocalizer;
-import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
import org.apache.pig.impl.logicalLayer.OperatorKey;
import org.apache.pig.shock.SSHSocketImplFactory;
@@ -218,7 +210,8 @@
public ExecPhysicalPlan compile(LogicalPlan[] plans,
Properties properties)
throws ExecException {
- if (plans == null) {
+ // TODO FIX Need to uncomment this with the right logic
+ /*if (plans == null) {
throw new ExecException("No Plans to compile");
}
@@ -248,13 +241,14 @@
}
}
- return new MapRedPhysicalPlan(physicalKey, physicalOpTable);
+ return new MapRedPhysicalPlan(physicalKey, physicalOpTable);*/
+ throw new ExecException("Unsupported Operation");
}
public ExecJob execute(ExecPhysicalPlan plan)
throws ExecException {
-
- POMapreduce pom = (POMapreduce) physicalOpTable.get(plan.getRoot());
+ // TODO FIX Need to uncomment this with the right logic
+ /*POMapreduce pom = (POMapreduce) physicalOpTable.get(plan.getRoot());
MapReduceLauncher.initQueryStatus(pom.numMRJobs()); // initialize status, for bookkeeping purposes.
MapReduceLauncher.setConf(this.conf.getConfiguration());
@@ -290,7 +284,8 @@
throw new ExecException(e);
}
- return new HJob(JOB_STATUS.COMPLETED, pigContext, pom.outputFileSpec);
+ return new HJob(JOB_STATUS.COMPLETED, pigContext, pom.outputFileSpec);*/
+ throw new ExecException("Unsupported Operation");
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/PigContext.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/PigContext.java?rev=645697&r1=645696&r2=645697&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/PigContext.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/PigContext.java Mon Apr 7 14:14:48 2008
@@ -39,7 +39,8 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.pig.Main;
+//TODO FIX Need to uncomment this with the right imports
+//import org.apache.pig.Main;
import org.apache.pig.PigServer.ExecType;
import org.apache.pig.backend.datastorage.DataStorage;
import org.apache.pig.backend.datastorage.DataStorageException;
@@ -48,10 +49,11 @@
import org.apache.pig.backend.executionengine.ExecutionEngine;
import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
-import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.MapReduceLauncher;
-import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.PigMapReduce;
-import org.apache.pig.backend.local.executionengine.LocalExecutionEngine;
-import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
+//TODO FIX Need to uncomment this with the right imports
+//import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.MapReduceLauncher;
+//import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.PigMapReduce;
+//import org.apache.pig.backend.local.executionengine.LocalExecutionEngine;
+//import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
import org.apache.pig.impl.util.JarManager;
import org.apache.pig.impl.util.WrappedIOException;
@@ -108,8 +110,9 @@
this.execType = execType;
initProperties();
-
- String pigJar = JarManager.findContainingJar(Main.class);
+ // TODO FIX Need to change this after Main starts working
+// String pigJar = JarManager.findContainingJar(Main.class);
+ String pigJar = JarManager.findContainingJar(PigContext.class);
String hadoopJar = JarManager.findContainingJar(FileSystem.class);
if (pigJar != null) {
skipJars.add(pigJar);
@@ -132,7 +135,9 @@
try{
// first read the properties in the jar file
- InputStream pis = MapReduceLauncher.class.getClassLoader().getResourceAsStream("properties");
+ // TODO FIX Need to uncomment this with the right class
+// InputStream pis = MapReduceLauncher.class.getClassLoader().getResourceAsStream("properties");
+ InputStream pis = PigContext.class.getClassLoader().getResourceAsStream("properties");
if (pis != null) {
fileProperties.load(pis);
}
@@ -163,7 +168,8 @@
public void connect() throws ExecException {
try {
switch (execType) {
- case LOCAL:
+// TODO FIX Need to uncomment this with the right logic
+ /*case LOCAL:
{
lfs = new HDataStorage(URI.create("file:///"),
new Configuration());
@@ -172,7 +178,7 @@
executionEngine = new LocalExecutionEngine(this);
}
- break;
+ break;*/
case MAPREDUCE:
{
@@ -228,7 +234,8 @@
public void addJar(URL resource) throws MalformedURLException{
if (resource != null) {
extraJars.add(resource);
- LogicalPlanBuilder.classloader = createCl(null);
+// TODO FIX Need to uncomment this with the right logic
+// LogicalPlanBuilder.classloader = createCl(null);
}
}
@@ -344,7 +351,9 @@
for (int i = 0; i < extraJars.size(); i++) {
urls[i + passedJar] = extraJars.get(i);
}
- return new URLClassLoader(urls, PigMapReduce.class.getClassLoader());
+// TODO FIX Need to uncomment this with the right logic
+// return new URLClassLoader(urls, PigMapReduce.class.getClassLoader());
+ return new URLClassLoader(urls, PigContext.class.getClassLoader());
}
public static String getClassNameFromSpec(String funcSpec){
@@ -368,7 +377,9 @@
for(String prefix: packageImportList) {
Class c;
try {
- c = Class.forName(prefix+name,true, LogicalPlanBuilder.classloader);
+// TODO FIX Need to uncomment this with the right logic
+// c = Class.forName(prefix+name,true, LogicalPlanBuilder.classloader);
+ c = Class.forName(prefix+name,true, PigContext.class.getClassLoader());
return c;
} catch (ClassNotFoundException e) {
} catch (LinkageError e) {}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java?rev=645697&r1=645696&r2=645697&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java Mon Apr 7 14:14:48 2008
@@ -17,7 +17,6 @@
*/
package org.apache.pig.impl.io;
-import java.lang.IllegalArgumentException;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
@@ -26,27 +25,23 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.util.Random;
-import java.util.Stack;
import java.util.ArrayList;
import java.util.Iterator;
+import java.util.Random;
+import java.util.Stack;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.JobConf;
import org.apache.pig.PigServer.ExecType;
+import org.apache.pig.backend.datastorage.ContainerDescriptor;
import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.backend.datastorage.DataStorageException;
import org.apache.pig.backend.datastorage.ElementDescriptor;
+import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.util.WrappedIOException;
-import org.apache.pig.backend.datastorage.*;
-import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
-import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.PigInputFormat;
-import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.PigInputFormat.PigRecordReader;
-
-import java.util.Properties;
-
public class FileLocalizer {
private static final Log log = LogFactory.getLog(FileLocalizer.class);
@@ -148,13 +143,14 @@
*/
public static InputStream openDFSFile(String fileName) throws IOException{
- PigRecordReader prr = PigInputFormat.PigRecordReader.getPigRecordReader();
+// TODO FIX Need to uncomment this with the right logic
+ /*PigRecordReader prr = PigInputFormat.PigRecordReader.getPigRecordReader();
if (prr == null)
throw new RuntimeException("can't open DFS file while executing locally");
- return openDFSFile(fileName, prr.getJobConf());
-
+ return openDFSFile(fileName, prr.getJobConf());*/
+ throw new IOException("Unsupported Operation");
}
public static InputStream openDFSFile(String fileName, JobConf conf) throws IOException{
@@ -234,6 +230,29 @@
}
return new FileOutputStream(fileSpec,append);
+ }
+ }
+
+ static public boolean delete(String fileSpec, PigContext pigContext) throws IOException{
+ fileSpec = checkDefaultPrefix(pigContext.getExecType(), fileSpec);
+ if (!fileSpec.startsWith(LOCAL_PREFIX)) {
+ init(pigContext);
+ ElementDescriptor elem = pigContext.getDfs().asElement(fileSpec);
+ elem.delete();
+ return true;
+ }
+ else {
+ fileSpec = fileSpec.substring(LOCAL_PREFIX.length());
+ boolean ret = true;
+ // TODO probably this should be replaced with the local file system
+ File f = (new File(fileSpec));
+ // TODO this only deletes the file. Any dirs createdas a part of this
+ // are not removed.
+ if (f!=null){
+ ret = f.delete();
+ }
+
+ return ret;
}
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java?rev=645697&r1=645696&r2=645697&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java Mon Apr 7 14:14:48 2008
@@ -22,10 +22,10 @@
import org.apache.pig.impl.physicalLayer.topLevelOperators.POGenerate;
//import org.apache.pig.impl.physicalLayer.topLevelOperators.POGenerate;
//import org.apache.pig.impl.physicalLayer.topLevelOperators.POGlobalRearrange;
-//import org.apache.pig.impl.physicalLayer.topLevelOperators.POLoad;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POLoad;
//import org.apache.pig.impl.physicalLayer.topLevelOperators.POLocalRearrange;
//import org.apache.pig.impl.physicalLayer.topLevelOperators.POPackage;
-//import org.apache.pig.impl.physicalLayer.topLevelOperators.POStore;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POStore;
import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
//import org.apache.pig.impl.physicalLayer.topLevelOperators.StartMap;
import org.apache.pig.impl.plan.PlanVisitor;
@@ -55,14 +55,14 @@
depthFirst();
}
-// public void visitLoad(POLoad ld){
-// //do nothing
-// }
-//
-// public void visitStore(POStore st){
-// //do nothing
-// }
-//
+ public void visitLoad(POLoad ld){
+ //do nothing
+ }
+
+ public void visitStore(POStore st){
+ //do nothing
+ }
+
public void visitFilter(POFilter fl) throws ParseException{
ExprPlanVisitor epv = new ExprPlanVisitor(fl.getPlan());
epv.visit();
Added: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POLoad.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POLoad.java?rev=645697&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POLoad.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POLoad.java Mon Apr 7 14:14:48 2008
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.physicalLayer.topLevelOperators;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.pig.LoadFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
+
+/**
+ * The load operator which is used in two ways:
+ * 1) As a local operator it can be used to load files
+ * 2) In the Map Reduce setting, it is used to create jobs
+ * from MapReduce operators which keep the loads and
+ * stores in the Map and Reduce Plans till the job is created
+ *
+ */
+public class POLoad extends PhysicalOperator<PhyPlanVisitor> {
+ // The user defined load function or a default load function
+ LoadFunc loader = null;
+ // The filespec on which the operator is based
+ FileSpec lFile;
+ // The stream used to bind to by the loader
+ InputStream is;
+ // PigContext passed to us by the operator creator
+ PigContext pc;
+ //Indicates whether the loader setup is done or not
+ boolean setUpDone = false;
+
+ private final Log log = LogFactory.getLog(getClass());
+
+ public POLoad(OperatorKey k) {
+ this(k,-1, null);
+ }
+
+
+ public POLoad(OperatorKey k, FileSpec lFile){
+ this(k,-1,lFile);
+ }
+
+ public POLoad(OperatorKey k, int rp, FileSpec lFile) {
+ super(k, rp);
+ }
+
+ /**
+ * Set up the loader by
+ * 1) Instantiating the load func
+ * 2) Opening an input stream to the specified file and
+ * 3) Binding to the input stream
+ * @throws IOException
+ */
+ private void setUp() throws IOException{
+ String filename = lFile.getFileName();
+ loader = (LoadFunc)PigContext.instantiateFuncFromSpec(lFile.getFuncSpec());
+
+ is = FileLocalizer.open(filename, pc);
+
+ loader.bindTo(filename , new BufferedPositionedInputStream(is), 0, Long.MAX_VALUE);
+ }
+
+ /**
+ * At the end of processing, the inputstream is closed
+ * using this method
+ * @throws IOException
+ */
+ private void tearDown() throws IOException{
+ is.close();
+ }
+
+ /**
+ * The main method used by this operator's successor
+ * to read tuples from the specified file using the
+ * specified load function.
+ *
+ * @return Whatever the loader returns
+ * A null from the loader is indicative
+ * of EOP and hence the tearDown of connection
+ */
+ @Override
+ public Result getNext(Tuple t) throws ExecException {
+ if(!setUpDone && lFile!=null){
+ try {
+ setUp();
+ } catch (IOException e) {
+ ExecException ee = new ExecException("Unable to setup the loader because of the exception: " + e.getMessage());
+ ee.initCause(e);
+ throw ee;
+ }
+ setUpDone = true;
+ }
+ Result res = new Result();
+ try {
+ res.result = loader.getNext();
+ if(res.result==null){
+ res.returnStatus = POStatus.STATUS_EOP;
+ tearDown();
+ }
+ else
+ res.returnStatus = POStatus.STATUS_OK;
+ } catch (IOException e) {
+ log.error("Received error from loader function: " + e);
+ res.returnStatus = POStatus.STATUS_ERR;
+ return res;
+ }
+ return res;
+ }
+
+ @Override
+ public String name() {
+ return "Load - " + mKey.toString();
+ }
+
+ @Override
+ public boolean supportsMultipleInputs() {
+ return false;
+ }
+
+ @Override
+ public boolean supportsMultipleOutputs() {
+ return false;
+ }
+
+ @Override
+ public void visit(PhyPlanVisitor v) throws ParseException {
+ v.visitLoad(this);
+ }
+
+
+ public FileSpec getLFile() {
+ return lFile;
+ }
+
+
+ public void setLFile(FileSpec file) {
+ lFile = file;
+ }
+
+
+ public PigContext getPc() {
+ return pc;
+ }
+
+
+ public void setPc(PigContext pc) {
+ this.pc = pc;
+ }
+
+}
Added: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POStore.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POStore.java?rev=645697&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POStore.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POStore.java Mon Apr 7 14:14:48 2008
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.physicalLayer.topLevelOperators;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+
+import org.apache.pig.StoreFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
+
+/**
+ * The store operator which is used in two ways:
+ * 1) As a local operator it can be used to store files
+ * 2) In the Map Reduce setting, it is used to create jobs
+ * from MapReduce operators which keep the loads and
+ * stores in the Map and Reduce Plans till the job is created
+ *
+ */
+public class POStore extends PhysicalOperator<PhyPlanVisitor> {
+ // The user defined load function or a default load function
+ private StoreFunc storer;
+ // The filespec on which the operator is based
+ FileSpec sFile;
+ // The stream used to bind to by the loader
+ OutputStream os;
+ // PigContext passed to us by the operator creator
+ PigContext pc;
+
+ public POStore(OperatorKey k) {
+ this(k, -1, null);
+ }
+
+ public POStore(OperatorKey k, int rp) {
+ this(k, rp, null);
+ }
+
+ public POStore(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+ super(k, rp, inp);
+ }
+
+ /**
+ * Set up the storer by
+ * 1) Instantiating the store func
+ * 2) Opening an output stream to the specified file and
+ * 3) Binding to the output stream
+ * @throws IOException
+ */
+ private void setUp() throws IOException{
+ storer = (StoreFunc)PigContext.instantiateFuncFromSpec(sFile.getFuncSpec());
+ os = FileLocalizer.create(sFile.getFileName(), pc);
+ storer.bindTo(os);
+ }
+
+ /**
+ * At the end of processing, the outputstream is closed
+ * using this method
+ * @throws IOException
+ */
+ private void tearDown() throws IOException{
+ os.close();
+ }
+
+ /**
+ * To perform cleanup when there is an error.
+ * Uses the FileLocalizer method which only
+ * deletes the file but not the dirs created
+ * with it.
+ * @throws IOException
+ */
+ private void cleanUp() throws IOException{
+ String fName = sFile.getFileName();
+ os.flush();
+ if(FileLocalizer.fileExists(fName,pc))
+ FileLocalizer.delete(fName,pc);
+ }
+
+ /**
+ * The main method used by the local execution engine
+ * to store tuples into the specified file using the
+ * specified store function. One call to this method
+ * retrieves all tuples from its predecessor operator
+ * and stores it into the file till it recieves an EOP.
+ *
+ * If there is an error, the cleanUp routine is called
+ * and then the tearDown is called to close the OutputStream
+ *
+ * @return Whatever the predecessor returns
+ * A null from the predecessor is ignored
+ * and processing of further tuples continued
+ */
+ public Result store() throws ExecException{
+ try{
+ setUp();
+ Result res;
+ Tuple inpValue = null;
+ while(true){
+ res = processInput();
+ if(res.returnStatus==POStatus.STATUS_OK)
+ storer.putNext((Tuple)res.result);
+ else if(res.returnStatus==POStatus.STATUS_NULL)
+ continue;
+ else
+ break;
+ }
+ if(res.returnStatus==POStatus.STATUS_EOP){
+ storer.finish();
+ }
+ else{
+ cleanUp();
+ }
+ tearDown();
+ return res;
+ }catch(IOException e){
+ e.printStackTrace();
+ return new Result();
+ }
+ }
+
+ @Override
+ public String name() {
+ return "Store - " + mKey.toString();
+ }
+
+ @Override
+ public boolean supportsMultipleInputs() {
+ return false;
+ }
+
+ @Override
+ public boolean supportsMultipleOutputs() {
+ return false;
+ }
+
+ public StoreFunc getStorer() {
+ return storer;
+ }
+
+
+
+ @Override
+ public void visit(PhyPlanVisitor v) throws ParseException {
+ v.visitStore(this);
+ }
+
+ public FileSpec getSFile() {
+ return sFile;
+ }
+
+ public void setSFile(FileSpec file) {
+ sFile = file;
+ }
+
+ public PigContext getPc() {
+ return pc;
+ }
+
+ public void setPc(PigContext pc) {
+ this.pc = pc;
+ }
+
+}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/util/JarManager.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/util/JarManager.java?rev=645697&r1=645696&r2=645697&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/util/JarManager.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/util/JarManager.java Mon Apr 7 14:14:48 2008
@@ -38,7 +38,7 @@
import java.util.jar.JarOutputStream;
import java.util.zip.ZipEntry;
-import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.PigMapReduce;
+//import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.PigMapReduce;
import org.apache.pig.impl.PigContext;
@@ -97,9 +97,12 @@
public static void createJar(OutputStream os, List<String> funcs, PigContext pigContext) throws ClassNotFoundException, IOException {
Vector<JarListEntry> jarList = new Vector<JarListEntry>();
for(String toSend: pigPackagesToSend) {
- addContainingJar(jarList, PigMapReduce.class, toSend, pigContext);
+// TODO FIX Need to uncomment this with the right logic
+// addContainingJar(jarList, PigMapReduce.class, toSend, pigContext);
}
- ClassLoader pigClassLoader = PigMapReduce.class.getClassLoader();
+// TODO FIX Need to uncomment this with the right logic
+// ClassLoader pigClassLoader = PigMapReduce.class.getClassLoader();
+ ClassLoader pigClassLoader = PigContext.class.getClassLoader();
for (String func: funcs) {
Class clazz = pigContext.getClassForAlias(func);
@@ -148,7 +151,9 @@
for (int i = 0; i < pigContext.extraJars.size(); i++) {
urls[i + passedJar] = new URL("file:" + pigContext.extraJars.get(i));
}
- return new URLClassLoader(urls, PigMapReduce.class.getClassLoader());
+// TODO FIX Need to uncomment this with the right logic
+// return new URLClassLoader(urls, PigMapReduce.class.getClassLoader());
+ return new URLClassLoader(urls, PigContext.class.getClassLoader());
}
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestFilter.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestFilter.java?rev=645697&r1=645696&r2=645697&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestFilter.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestFilter.java Mon Apr 7 14:14:48 2008
@@ -37,6 +37,7 @@
import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POProject;
import org.apache.pig.test.utils.GenPhyOp;
import org.apache.pig.test.utils.GenRandomData;
+import org.apache.pig.test.utils.TestHelper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -75,15 +76,6 @@
public void tearDown() throws Exception {
}
- private boolean bagContains(DataBag db, Tuple t) {
- Iterator<Tuple> iter = db.iterator();
- for (Tuple tuple : db) {
- if (tuple.compareTo(t) == 0)
- return true;
- }
- return false;
- }
-
@Test
public void testGetNextTuple() throws ExecException, IOException {
pass.attachInput(t);
@@ -99,7 +91,7 @@
break;
assertEquals(POStatus.STATUS_OK, res.returnStatus);
Tuple output = (Tuple) res.result;
- assertEquals(true, bagContains(inp, output));
+ assertEquals(true, TestHelper.bagContains(inp, output));
assertEquals(true, (Integer) ((Tuple) res.result).get(1) > 50);
}
}
Added: incubator/pig/branches/types/test/org/apache/pig/test/TestLoad.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestLoad.java?rev=645697&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestLoad.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestLoad.java Mon Apr 7 14:14:48 2008
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import static org.junit.Assert.*;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+
+import org.apache.hadoop.fs.BufferedFSInputStream;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DefaultBagFactory;
+import org.apache.pig.data.DefaultTuple;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POLoad;
+import org.apache.pig.test.utils.GenPhyOp;
+import org.apache.pig.test.utils.TestHelper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestLoad {
+ FileSpec inpFSpec;
+ POLoad ld;
+ PigContext pc;
+ DataBag inpDB;
+
+ static MiniCluster cluster = MiniCluster.buildCluster();
+ @Before
+ public void setUp() throws Exception {
+
+ inpFSpec = new FileSpec("file:////etc/passwd",PigStorage.class.getName()+"(':')");
+ pc = new PigContext();
+ pc.connect();
+
+ ld = GenPhyOp.topLoadOp();
+ ld.setLFile(inpFSpec);
+ ld.setPc(pc);
+
+ inpDB = DefaultBagFactory.getInstance().newDefaultBag();
+ BufferedReader br = new BufferedReader(new FileReader("/etc/passwd"));
+
+ for(String line = br.readLine();line!=null;line=br.readLine()){
+ String[] flds = line.split(":",-1);
+ Tuple t = new DefaultTuple();
+ for (String fld : flds) {
+ t.append((fld.compareTo("")!=0 ? new DataByteArray(fld.getBytes()) : null));
+ }
+ inpDB.add(t);
+ }
+ }
+
+
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public void testGetNextTuple() throws ExecException {
+ Tuple t=null;
+ int size = 0;
+ for(Result res = ld.getNext(t);res.returnStatus!=POStatus.STATUS_EOP;res=ld.getNext(t)){
+ assertEquals(true, TestHelper.bagContains(inpDB, (Tuple)res.result));
+ ++size;
+ }
+ assertEquals(true, size==inpDB.size());
+ }
+
+}
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestProject.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestProject.java?rev=645697&r1=645696&r2=645697&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestProject.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestProject.java Mon Apr 7 14:14:48 2008
@@ -32,6 +32,7 @@
import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POProject;
import org.apache.pig.test.utils.GenPhyOp;
import org.apache.pig.test.utils.GenRandomData;
+import org.apache.pig.test.utils.TestHelper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -70,15 +71,6 @@
}
}
- private boolean bagContains(DataBag db, Tuple t) {
- Iterator<Tuple> iter = db.iterator();
- for (Tuple tuple : db) {
- if (tuple.compareTo(t) == 0)
- return true;
- }
- return false;
- }
-
@Test
public void testGetNextTuple() throws IOException, ExecException {
proj.attachInput(t);
@@ -91,7 +83,7 @@
res = proj.getNext(t);
if (res.returnStatus == POStatus.STATUS_EOP)
break;
- if (!bagContains(inpBag, (Tuple) res.result)) {
+ if (!TestHelper.bagContains(inpBag, (Tuple) res.result)) {
contains = false;
break;
}
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestStore.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestStore.java?rev=645697&r1=645696&r2=645697&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestStore.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestStore.java Mon Apr 7 14:14:48 2008
@@ -17,100 +17,88 @@
*/
package org.apache.pig.test;
-import java.io.File;
-import java.io.PrintWriter;
-import java.util.Iterator;
+import static org.junit.Assert.assertEquals;
-import org.apache.pig.PigServer;
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
+import org.apache.pig.data.DefaultTuple;
import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POStore;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POProject;
+import org.apache.pig.test.utils.GenPhyOp;
+import org.apache.pig.test.utils.GenRandomData;
+import org.apache.pig.test.utils.TestHelper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestStore {
+ POStore st;
+ FileSpec fSpec;
+ DataBag inpDB;
+ static MiniCluster cluster = MiniCluster.buildCluster();
+ PigContext pc;
+
+ @Before
+ public void setUp() throws Exception {
+ st = GenPhyOp.topStoreOp();
+ fSpec = new FileSpec("file:////tmp/storeTest.txt",PigStorage.class.getName()+"(':')");
+ st.setSFile(fSpec);
+ pc = new PigContext();
+ pc.connect();
+ st.setPc(pc);
+
+ POProject proj = GenPhyOp.exprProject();
+ proj.setColumn(0);
+ proj.setResultType(DataType.TUPLE);
+ proj.setOverloaded(true);
+ List<PhysicalOperator> inps = new ArrayList<PhysicalOperator>();
+ inps.add(proj);
+ st.setInputs(inps);
+
+ inpDB = GenRandomData.genRandSmallTupDataBag(new Random(), 10, 100);
+ Tuple t = new DefaultTuple();
+ t.append(inpDB);
+ proj.attachInput(t);
+ }
-import junit.framework.TestCase;
-
-public class TestStore extends TestCase {
-
- private String initString = "mapreduce";
- MiniCluster cluster = MiniCluster.buildCluster();
- private int LOOP_COUNT = 1024;
-
- String fileName;
- String tmpFile1, tmpFile2;
- PigServer pig;
-
- public void testSingleStore() throws Exception{
- pig.registerQuery("A = load " + fileName + ";");
-
- pig.store("A", tmpFile1);
-
- pig.registerQuery("B = load " + tmpFile1 + ";");
- Iterator<Tuple> iter = pig.openIterator("B");
-
- int i =0;
- while (iter.hasNext()){
- Tuple t = iter.next();
- assertEquals(DataType.toInteger(t.get(0)).intValue(),i);
- assertEquals(DataType.toInteger(t.get(1)).intValue(),i);
- i++;
- }
+ @After
+ public void tearDown() throws Exception {
}
-
- public void testMultipleStore() throws Exception{
- pig.registerQuery("A = load " + fileName + ";");
-
- pig.store("A", tmpFile1);
-
- pig.registerQuery("B = foreach (group A by $0) generate $0, SUM($1);");
- pig.store("B", tmpFile2);
- pig.registerQuery("C = load " + tmpFile2 + ";");
- Iterator<Tuple> iter = pig.openIterator("C");
-
- int i =0;
- while (iter.hasNext()){
- Tuple t = iter.next();
- i++;
+
+ @Test
+ public void testStore() throws ExecException, IOException {
+ Result res = st.store();
+ assertEquals(POStatus.STATUS_EOP, res.returnStatus);
+
+ int size = 0;
+ BufferedReader br = new BufferedReader(new FileReader("/tmp/storeTest.txt"));
+ for(String line=br.readLine();line!=null;line=br.readLine()){
+ String[] flds = line.split(":",-1);
+ Tuple t = new DefaultTuple();
+ t.append(flds[0].compareTo("")!=0 ? flds[0] : null);
+ t.append(Integer.parseInt(flds[1]));
+ assertEquals(true, TestHelper.bagContains(inpDB, t));
+ ++size;
}
-
- assertEquals(LOOP_COUNT, i);
-
- }
-
- public void testStoreWithMultipleMRJobs() throws Exception{
- pig.registerQuery("A = load " + fileName + ";");
- pig.registerQuery("B = foreach (group A by $0) generate $0, SUM($1);");
- pig.registerQuery("C = foreach (group B by $0) generate $0, SUM($1);");
- pig.registerQuery("D = foreach (group C by $0) generate $0, SUM($1);");
-
- pig.store("D", tmpFile2);
- pig.registerQuery("E = load " + tmpFile2 + ";");
- Iterator<Tuple> iter = pig.openIterator("E");
-
- int i =0;
- while (iter.hasNext()){
- Tuple t = iter.next();
- i++;
- }
-
- assertEquals(LOOP_COUNT, i);
-
+ assertEquals(true, size==inpDB.size());
+ FileLocalizer.delete(fSpec.getFileName(), pc);
}
-
- @Override
- protected void setUp() throws Exception {
- super.setUp();
- File f = File.createTempFile("tmp", "");
- PrintWriter pw = new PrintWriter(f);
- for (int i=0;i<LOOP_COUNT; i++){
- pw.println(i + "\t" + i);
- }
- pw.close();
- pig = new PigServer(initString);
- fileName = "'" + FileLocalizer.hadoopify(f.toString(), pig.getPigContext()) + "'";
- tmpFile1 = "'" + FileLocalizer.getTemporaryPath(null, pig.getPigContext()).toString() + "'";
- tmpFile2 = "'" + FileLocalizer.getTemporaryPath(null, pig.getPigContext()).toString() + "'";
- f.delete();
- }
-
}
Added: incubator/pig/branches/types/test/org/apache/pig/test/TestStoreOld.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestStoreOld.java?rev=645697&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestStoreOld.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestStoreOld.java Mon Apr 7 14:14:48 2008
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.PrintWriter;
+import java.util.Iterator;
+
+import org.apache.pig.PigServer;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.FileLocalizer;
+
+import junit.framework.TestCase;
+
+public class TestStoreOld extends TestCase {
+
+ private String initString = "mapreduce";
+ MiniCluster cluster = MiniCluster.buildCluster();
+ private int LOOP_COUNT = 1024;
+
+ String fileName;
+ String tmpFile1, tmpFile2;
+ PigServer pig;
+
+ public void testSingleStore() throws Exception{
+ pig.registerQuery("A = load " + fileName + ";");
+
+ pig.store("A", tmpFile1);
+
+ pig.registerQuery("B = load " + tmpFile1 + ";");
+ Iterator<Tuple> iter = pig.openIterator("B");
+
+ int i =0;
+ while (iter.hasNext()){
+ Tuple t = iter.next();
+ assertEquals(DataType.toInteger(t.get(0)).intValue(),i);
+ assertEquals(DataType.toInteger(t.get(1)).intValue(),i);
+ i++;
+ }
+ }
+
+ public void testMultipleStore() throws Exception{
+ pig.registerQuery("A = load " + fileName + ";");
+
+ pig.store("A", tmpFile1);
+
+ pig.registerQuery("B = foreach (group A by $0) generate $0, SUM($1);");
+ pig.store("B", tmpFile2);
+ pig.registerQuery("C = load " + tmpFile2 + ";");
+ Iterator<Tuple> iter = pig.openIterator("C");
+
+ int i =0;
+ while (iter.hasNext()){
+ Tuple t = iter.next();
+ i++;
+
+ }
+
+ assertEquals(LOOP_COUNT, i);
+
+ }
+
+ public void testStoreWithMultipleMRJobs() throws Exception{
+ pig.registerQuery("A = load " + fileName + ";");
+ pig.registerQuery("B = foreach (group A by $0) generate $0, SUM($1);");
+ pig.registerQuery("C = foreach (group B by $0) generate $0, SUM($1);");
+ pig.registerQuery("D = foreach (group C by $0) generate $0, SUM($1);");
+
+ pig.store("D", tmpFile2);
+ pig.registerQuery("E = load " + tmpFile2 + ";");
+ Iterator<Tuple> iter = pig.openIterator("E");
+
+ int i =0;
+ while (iter.hasNext()){
+ Tuple t = iter.next();
+ i++;
+ }
+
+ assertEquals(LOOP_COUNT, i);
+
+ }
+
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ File f = File.createTempFile("tmp", "");
+ PrintWriter pw = new PrintWriter(f);
+ for (int i=0;i<LOOP_COUNT; i++){
+ pw.println(i + "\t" + i);
+ }
+ pw.close();
+ pig = new PigServer(initString);
+ fileName = "'" + FileLocalizer.hadoopify(f.toString(), pig.getPigContext()) + "'";
+ tmpFile1 = "'" + FileLocalizer.getTemporaryPath(null, pig.getPigContext()).toString() + "'";
+ tmpFile2 = "'" + FileLocalizer.getTemporaryPath(null, pig.getPigContext()).toString() + "'";
+ f.delete();
+ }
+
+}
Modified: incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java?rev=645697&r1=645696&r2=645697&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java Mon Apr 7 14:14:48 2008
@@ -26,10 +26,10 @@
import org.apache.pig.impl.physicalLayer.topLevelOperators.POFilter;
//import org.apache.pig.impl.physicalLayer.topLevelOperators.POGenerate;
//import org.apache.pig.impl.physicalLayer.topLevelOperators.POGlobalRearrange;
-//import org.apache.pig.impl.physicalLayer.topLevelOperators.POLoad;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POLoad;
//import org.apache.pig.impl.physicalLayer.topLevelOperators.POLocalRearrange;
//import org.apache.pig.impl.physicalLayer.topLevelOperators.POPackage;
-//import org.apache.pig.impl.physicalLayer.topLevelOperators.POStore;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POStore;
//import org.apache.pig.impl.physicalLayer.topLevelOperators.StartMap;
import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ConstantExpression;
import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POProject;
@@ -94,10 +94,10 @@
// return ret;
// }
//
-// public static POLoad topLoadOp(){
-// POLoad ret = new POLoad(new OperatorKey("",r.nextLong()));
-// return ret;
-// }
+ public static POLoad topLoadOp(){
+ POLoad ret = new POLoad(new OperatorKey("",r.nextLong()));
+ return ret;
+ }
public static POFilter topFilterOp(){
POFilter ret = new POFilter(new OperatorKey("",r.nextLong()));
@@ -132,14 +132,14 @@
}
public static POFilter topFilterOpWithProj(int col, int rhsVal) throws IOException{
- POFilter ret = new POFilter(new OperatorKey("",r.nextLong()));
-
- POProject proj = exprProject();
- proj.setResultType(DataType.INTEGER);
- proj.setColumn(col);
- proj.setOverloaded(false);
-
- ConstantExpression ce2 = GenPhyOp.exprConst();
+ POFilter ret = new POFilter(new OperatorKey("",r.nextLong()));
+
+ POProject proj = exprProject();
+ proj.setResultType(DataType.INTEGER);
+ proj.setColumn(col);
+ proj.setOverloaded(false);
+
+ ConstantExpression ce2 = GenPhyOp.exprConst();
ce2.setValue(rhsVal);
GreaterThanExpr gr = GenPhyOp.compGreaterThanExpr();
@@ -170,10 +170,10 @@
// return ret;
// }
//
-// public static POStore topStoreOp(){
-// POStore ret = new POStore(new OperatorKey("",r.nextLong()));
-// return ret;
-// }
+ public static POStore topStoreOp(){
+ POStore ret = new POStore(new OperatorKey("",r.nextLong()));
+ return ret;
+ }
//
// public static StartMap topStartMapOp(){
// StartMap ret = new StartMap(new OperatorKey("",r.nextLong()));
Added: incubator/pig/branches/types/test/org/apache/pig/test/utils/TestHelper.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/utils/TestHelper.java?rev=645697&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/utils/TestHelper.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/utils/TestHelper.java Mon Apr 7 14:14:48 2008
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test.utils;
+
+import java.util.Iterator;
+
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+
+/**
+ * Will contain static methods that will be useful
+ * for unit tests
+ *
+ */
+public class TestHelper {
+ public static boolean bagContains(DataBag db, Tuple t) {
+ Iterator<Tuple> iter = db.iterator();
+ for (Tuple tuple : db) {
+ if (tuple.compareTo(t) == 0)
+ return true;
+ }
+ return false;
+ }
+}