You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zj...@apache.org on 2010/05/12 18:43:13 UTC
svn commit: r943578 - in /hadoop/pig/trunk: CHANGES.txt
src/org/apache/pig/PigServer.java test/org/apache/pig/test/TestPigServer.java
test/org/apache/pig/test/Util.java
Author: zjffdu
Date: Wed May 12 16:43:13 2010
New Revision: 943578
URL: http://svn.apache.org/viewvc?rev=943578&view=rev
Log:
Pig-1410: Make PigServer can handle files with parameters
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/PigServer.java
hadoop/pig/trunk/test/org/apache/pig/test/TestPigServer.java
hadoop/pig/trunk/test/org/apache/pig/test/Util.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=943578&r1=943577&r2=943578&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Wed May 12 16:43:13 2010
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-1410: Make PigServer can handle files with parameters (zjffdu)
+
PIG-1406: Allow to run shell commands from grunt (zjffdu)
PIG-1398: Marking Pig interfaces for org.apache.pig.data package (gates)
Modified: hadoop/pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigServer.java?rev=943578&r1=943577&r2=943578&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/PigServer.java Wed May 12 16:43:13 2010
@@ -17,11 +17,16 @@
*/
package org.apache.pig;
+import java.io.BufferedReader;
import java.io.File;
+import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
+import java.io.InputStreamReader;
import java.io.PrintStream;
+import java.io.StringReader;
+import java.io.StringWriter;
import java.net.URL;
import java.util.ArrayList;
import java.util.LinkedList;
@@ -98,6 +103,7 @@ import org.apache.pig.impl.logicalLayer.
import org.apache.pig.pen.ExampleGenerator;
import org.apache.pig.impl.util.LogUtils;
import org.apache.pig.tools.grunt.GruntParser;
+import org.apache.pig.tools.parameters.ParameterSubstitutionPreprocessor;
/**
@@ -493,22 +499,70 @@ public class PigServer {
* @throws IOException
*/
public void registerScript(String fileName) throws IOException {
+ registerScript(fileName, null, null);
+ }
+
+ /**
+ * Register a pig script file. The parameters in the file will be substituted with the values in params
+ * @param fileName pig script file
+ * @param params the key is the parameter name, and the value is the parameter value
+ * @throws IOException
+ */
+ public void registerScript(String fileName, Map<String,String> params) throws IOException {
+ registerScript(fileName, params, null);
+ }
+
+ /**
+ * Register a pig script file. The parameters in the file will be substituted with the values in the parameter files
+ * @param fileName pig script file
+ * @param paramsFiles files which have the parameter setting
+ * @throws IOException
+ */
+ public void registerScript(String fileName, List<String> paramsFiles) throws IOException {
+ registerScript(fileName, null, paramsFiles);
+ }
+
+ /**
+ * Register a pig script file. The parameters in the file will be substituted with the values in the map and the parameter files
+ * The values in params Map will override the value in parameter file if they have the same parameter
+ * @param fileName pig script
+ * @param params the key is the parameter name, and the value is the parameter value
+ * @param paramsFiles files which have the parameter setting
+ * @throws IOException
+ */
+ public void registerScript(String fileName, Map<String,String> params,List<String> paramsFiles) throws IOException {
try {
- GruntParser grunt = new GruntParser(new FileReader(new File(fileName)));
+ // transform the map type to list type which can been accepted by ParameterSubstitutionPreprocessor
+ List<String> paramList = new ArrayList<String>();
+ if (params!=null){
+ for (Map.Entry<String, String> entry:params.entrySet()){
+ paramList.add(entry.getKey()+"="+entry.getValue());
+ }
+ }
+
+ // do parameter substitution
+ ParameterSubstitutionPreprocessor psp = new ParameterSubstitutionPreprocessor(50);
+ StringWriter writer = new StringWriter();
+ psp.genSubstitutedFile(new BufferedReader(new InputStreamReader(new FileInputStream(fileName))),
+ writer,
+ paramList.size() > 0 ? paramList.toArray(new String[0]) : null,
+ paramsFiles!=null ? paramsFiles.toArray(new String[0]) : null);
+
+ GruntParser grunt = new GruntParser(new StringReader(writer.toString()));
grunt.setInteractive(false);
grunt.setParams(this);
grunt.parseStopOnError(true);
} catch (FileNotFoundException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ log.error(e.getLocalizedMessage());
throw new IOException(e.getCause());
} catch (org.apache.pig.tools.pigscript.parser.ParseException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ log.error(e.getLocalizedMessage());
+ throw new IOException(e.getCause());
+ } catch (org.apache.pig.tools.parameters.ParseException e) {
+ log.error(e.getLocalizedMessage());
throw new IOException(e.getCause());
}
}
-
/**
* Intended to be used by unit tests only.
* Print a list of all aliases in in the current Pig Latin script. Output is written to
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestPigServer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPigServer.java?rev=943578&r1=943577&r2=943578&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestPigServer.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestPigServer.java Wed May 12 16:43:13 2010
@@ -18,37 +18,40 @@
package org.apache.pig.test;
-import java.io.File;
-import java.io.IOException;
-import java.io.FileOutputStream;
-import java.io.OutputStreamWriter;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.DataInputStream;
+import java.io.File;
import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
import java.io.PrintStream;
-import java.util.List;
-import java.util.Iterator;
+import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLClassLoader;
-import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.TestCase;
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
+import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.schema.Schema;
-
+import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
-import org.junit.After;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-import junit.framework.TestCase;
-
@RunWith(JUnit4.class)
public class TestPigServer extends TestCase {
private PigServer pig = null;
@@ -570,4 +573,48 @@ public class TestPigServer extends TestC
}
fileWithStdOutContents.close();
}
+
+ @Test
+ public void testParamSubstitution() throws Exception{
+ // using params map
+ PigServer pig=new PigServer(ExecType.LOCAL);
+ Map<String,String> params=new HashMap<String, String>();
+ params.put("input", "test/org/apache/pig/test/data/passwd");
+ File scriptFile=Util.createFile(new String[]{"a = load '$input' using PigStorage(':');"});
+ pig.registerScript(scriptFile.getAbsolutePath(),params);
+ Iterator<Tuple> iter=pig.openIterator("a");
+ int index=0;
+ List<Tuple> expectedTuples=Util.readFile2TupleList("test/org/apache/pig/test/data/passwd", ":");
+ while(iter.hasNext()){
+ Tuple tuple=iter.next();
+ assertEquals(tuple.get(0).toString(), expectedTuples.get(index).get(0).toString());
+ index++;
+ }
+
+ // using param file
+ pig=new PigServer(ExecType.LOCAL);
+ List<String> paramFile=new ArrayList<String>();
+ paramFile.add(Util.createFile(new String[]{"input=test/org/apache/pig/test/data/passwd2"}).getAbsolutePath());
+ pig.registerScript(scriptFile.getAbsolutePath(),paramFile);
+ iter=pig.openIterator("a");
+ index=0;
+ expectedTuples=Util.readFile2TupleList("test/org/apache/pig/test/data/passwd2", ":");
+ while(iter.hasNext()){
+ Tuple tuple=iter.next();
+ assertEquals(tuple.get(0).toString(), expectedTuples.get(index).get(0).toString());
+ index++;
+ }
+
+ // using both param value and param file, param value should override param file
+ pig=new PigServer(ExecType.LOCAL);
+ pig.registerScript(scriptFile.getAbsolutePath(),params,paramFile);
+ iter=pig.openIterator("a");
+ index=0;
+ expectedTuples=Util.readFile2TupleList("test/org/apache/pig/test/data/passwd", ":");
+ while(iter.hasNext()){
+ Tuple tuple=iter.next();
+ assertEquals(tuple.get(0).toString(), expectedTuples.get(index).get(0).toString());
+ index++;
+ }
+ }
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/Util.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/Util.java?rev=943578&r1=943577&r2=943578&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/Util.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/Util.java Wed May 12 16:43:13 2010
@@ -23,6 +23,7 @@ import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
+import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.FileWriter;
@@ -35,6 +36,7 @@ import java.io.PrintStream;
import java.io.PrintWriter;
import java.io.StringReader;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -593,5 +595,16 @@ public class Util {
System.err.println(out.toString());
}
-
+ public static List<Tuple> readFile2TupleList(String file, String delimiter) throws IOException{
+ List<Tuple> tuples=new ArrayList<Tuple>();
+ String line=null;
+ BufferedReader reader=new BufferedReader(new InputStreamReader(new FileInputStream(file)));
+ while((line=reader.readLine())!=null){
+ String[] tokens=line.split(delimiter);
+ Tuple tuple=TupleFactory.getInstance().newTuple(Arrays.asList(tokens));
+ tuples.add(tuple);
+ }
+ reader.close();
+ return tuples;
+ }
}