You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2015/10/04 20:22:33 UTC
svn commit: r1706711 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/scripting/BoundScript.java
test/org/apache/pig/test/TestPigServerLocal.java
Author: rohini
Date: Sun Oct 4 18:22:32 2015
New Revision: 1706711
URL: http://svn.apache.org/viewvc?rev=1706711&view=rev
Log:
PIG-4670: Embedded Python scripts still parse line by line (rohini)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/scripting/BoundScript.java
pig/trunk/test/org/apache/pig/test/TestPigServerLocal.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1706711&r1=1706710&r2=1706711&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sun Oct 4 18:22:32 2015
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-4670: Embedded Python scripts still parse line by line (rohini)
+
PIG-4663: HBaseStorage should allow the MaxResultsPerColumnFamily limit to avoid memory or scan timeout issues (pmazak via rohini)
PIG-4673: Built In UDF - REPLACE_MULTI : For a given string, search and replace all occurrences
Modified: pig/trunk/src/org/apache/pig/scripting/BoundScript.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/scripting/BoundScript.java?rev=1706711&r1=1706710&r2=1706711&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/scripting/BoundScript.java (original)
+++ pig/trunk/src/org/apache/pig/scripting/BoundScript.java Sun Oct 4 18:22:32 2015
@@ -31,8 +31,8 @@ import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.pig.PigServer;
import org.apache.pig.PigRunner.ReturnCode;
+import org.apache.pig.PigServer;
import org.apache.pig.impl.PigContext;
import org.apache.pig.tools.grunt.GruntParser;
import org.apache.pig.tools.pigscript.parser.ParseException;
@@ -45,41 +45,41 @@ import org.apache.pig.tools.pigstats.Scr
* This represents an instance of a bound pipeline.
*/
public class BoundScript {
-
+
private static final Log LOG = LogFactory.getLog(BoundScript.class);
-
+
private List<String> queries = new ArrayList<String>();
private String name = null;
-
+
private ScriptPigContext scriptContext = null;
-
+
BoundScript(String query, ScriptPigContext scriptContext, String name) {
this.queries.add(query);
this.scriptContext = scriptContext;
- this.name = name;
+ this.name = name;
}
-
+
BoundScript(List<String> queries, ScriptPigContext scriptContext,
String name) {
this.queries.addAll(queries);
this.scriptContext = ScriptPigContext.get();
- this.name = name;
+ this.name = name;
}
-
+
/**
- * Run a pipeline on Hadoop.
- * If there are no stores in this pipeline then nothing will be run.
+ * Run a pipeline on Hadoop.
+ * If there are no stores in this pipeline then nothing will be run.
* @return {@link PigStats}, null if there is no bound query to run.
* @throws IOException
*/
public PigStats runSingle() throws IOException {
return runSingle((Properties)null);
}
-
+
/**
- * Run a pipeline on Hadoop.
- * If there are no stores in this pipeline then nothing will be run.
+ * Run a pipeline on Hadoop.
+ * If there are no stores in this pipeline then nothing will be run.
* @param prop Map of properties that Pig should set when running the script.
* This is intended for use with scripting languages that do not support
* the Properties object.
@@ -98,14 +98,14 @@ public class BoundScript {
if (prop != null) {
scriptContext.getPigContext().getProperties().putAll(prop);
}
- PigStats ret = exec(queries.get(0));
+ PigStats ret = exec(queries.get(0));
setPigStats(ret);
return ret;
}
-
+
/**
- * Run a pipeline on Hadoop.
- * If there are no stores in this pipeline then nothing will be run.
+ * Run a pipeline on Hadoop.
+ * If there are no stores in this pipeline then nothing will be run.
* @param propfile File with properties that Pig should set when running the script.
* @return {@link PigStats}, null if there is no bound query to run.
* @throws IOException
@@ -123,17 +123,17 @@ public class BoundScript {
}
/**
- * Run multiple instances of bound pipeline on Hadoop in parallel.
- * If there are no stores in this pipeline then nothing will be run.
- * Bind is called first with the list of maps of variables to bind.
+ * Run multiple instances of bound pipeline on Hadoop in parallel.
+ * If there are no stores in this pipeline then nothing will be run.
+ * Bind is called first with the list of maps of variables to bind.
* @return a list of {@link PigStats}, one for each map of variables passed
* to bind.
* @throws IOException
- */
- public List<PigStats> run() throws IOException {
+ */
+ public List<PigStats> run() throws IOException {
return run((Properties)null);
}
-
+
/**
* Run multiple instances of bound pipeline on Hadoop in parallel.
* @param prop Map of properties that Pig should set when running the script.
@@ -148,7 +148,7 @@ public class BoundScript {
if (queries.isEmpty()) {
LOG.info("No bound query to run.");
return stats;
- }
+ }
if (queries.size() == 1) {
PigStats ps = runSingle();
stats.add(ps);
@@ -157,20 +157,20 @@ public class BoundScript {
if (prop != null) {
scriptContext.getPigContext().getProperties().putAll(prop);
}
- List<PigProgressNotificationListener> listeners
+ List<PigProgressNotificationListener> listeners
= ScriptState.get().getAllListeners();
- SyncProgressNotificationAdaptor adaptor
+ SyncProgressNotificationAdaptor adaptor
= new SyncProgressNotificationAdaptor(listeners);
List<Future<PigStats>> futures = new ArrayList<Future<PigStats>>();
ExecutorService executor = Executors.newFixedThreadPool(queries.size());
- for (int i=0; i<queries.size(); i++) {
+ for (int i=0; i<queries.size(); i++) {
Properties props = new Properties();
props.putAll(scriptContext.getPigContext().getProperties());
PigContext ctx = new PigContext(scriptContext.getPigContext().getExecType(), props);
MyCallable worker = new MyCallable(queries.get(i), ctx, adaptor);
Future<PigStats> submit = executor.submit(worker);
futures.add(submit);
- }
+ }
for (Future<PigStats> future : futures) {
try {
stats.add(future.get());
@@ -178,23 +178,23 @@ public class BoundScript {
LOG.error("Pig pipeline failed to complete", e);
PigStatsUtil.getEmptyPigStats();
PigStatsUtil.setErrorMessage(e.getMessage());
- PigStats failed = PigStatsUtil.getPigStats(ReturnCode.FAILURE);
+ PigStats failed = PigStatsUtil.getPigStats(ReturnCode.FAILURE);
stats.add(failed);
} catch (ExecutionException e) {
LOG.error("Pig pipeline failed to complete", e);
PigStatsUtil.getEmptyPigStats();
- PigStatsUtil.setErrorMessage(e.getMessage());
- PigStats failed = PigStatsUtil.getPigStats(ReturnCode.FAILURE);
+ PigStatsUtil.setErrorMessage(e.getMessage());
+ PigStats failed = PigStatsUtil.getPigStats(ReturnCode.FAILURE);
stats.add(failed);
}
}
-
+
if (!stats.isEmpty()) {
setPigStats(stats);;
}
return stats;
}
-
+
/**
* Run multiple instances of bound pipeline on Hadoop in parallel.
* @param propfile File with properties that Pig should set when running the script.
@@ -210,12 +210,12 @@ public class BoundScript {
prop.load(fin);
} finally {
if (fin != null) fin.close();
- }
+ }
return run(prop);
}
/**
- * Run illustrate for this pipeline. Results will be printed to stdout.
+ * Run illustrate for this pipeline. Results will be printed to stdout.
* @throws IOException if illustrate fails.
*/
public void illustrate() throws IOException {
@@ -255,10 +255,10 @@ public class BoundScript {
}
PigServer pigServer = new PigServer(scriptContext.getPigContext(), false);
registerQuery(pigServer, queries.get(0));
- pigServer.dumpSchema(alias);
+ pigServer.dumpSchema(alias);
}
- //-------------------------------------------------------------------------
+ //-------------------------------------------------------------------------
private PigStats exec(String query) throws IOException {
LOG.info("Query to run:\n" + query);
@@ -271,30 +271,27 @@ public class BoundScript {
ScriptState.get().registerListener(listener);
}
PigServer pigServer = new PigServer(scriptContext.getPigContext(), false);
- pigServer.setBatchOn();
GruntParser grunt = new GruntParser(new StringReader(query), pigServer);
grunt.setInteractive(false);
try {
- grunt.parseStopOnError(true);
+ grunt.parseStopOnError(false);
} catch (ParseException e) {
throw new IOException("Failed to parse script " + e.getMessage(), e);
}
- pigServer.executeBatch();
return PigStats.get();
}
private void registerQuery(PigServer pigServer, String pl) throws IOException {
GruntParser grunt = new GruntParser(new StringReader(pl), pigServer);
grunt.setInteractive(false);
- pigServer.setBatchOn();
try {
- grunt.parseStopOnError(true);
+ grunt.parseStopOnError(false);
} catch (ParseException e) {
throw new IOException("Failed to parse query: " + pl, e);
}
}
-
- private void setPigStats(PigStats stats) {
+
+ private void setPigStats(PigStats stats) {
ScriptEngine engine = scriptContext.getScriptEngine();
if (name != null) {
engine.setPigStats(name, stats);
@@ -304,28 +301,28 @@ public class BoundScript {
}
private void setPigStats(List<PigStats> lst) {
- if (lst == null || lst.isEmpty()) return;
+ if (lst == null || lst.isEmpty()) return;
String key = (name != null) ? name : this.toString();
ScriptEngine engine = scriptContext.getScriptEngine();
for (PigStats stats : lst) {
engine.setPigStats(key, stats);
- }
+ }
}
-
+
//-------------------------------------------------------------------------
-
+
private class MyCallable implements Callable<PigStats> {
-
+
private String query = null;
private PigContext ctx = null;
private PigProgressNotificationListener adaptor;
-
+
public MyCallable(String pl, PigContext ctx, PigProgressNotificationListener adaptor) {
query = pl;
this.ctx = ctx;
this.adaptor = adaptor;
}
-
+
@Override
public PigStats call() throws Exception {
LOG.info("Query to run:\n" + query);
@@ -335,15 +332,13 @@ public class BoundScript {
ScriptState.get().setScript(query);
ScriptState.get().registerListener(adaptor);
PigServer pigServer = new PigServer(ctx, true);
- pigServer.setBatchOn();
GruntParser grunt = new GruntParser(new StringReader(query), pigServer);
grunt.setInteractive(false);
try {
- grunt.parseStopOnError(true);
+ grunt.parseStopOnError(false);
} catch (ParseException e) {
throw new IOException("Failed to parse script", e);
}
- pigServer.executeBatch();
return PigStats.get();
}
}
Modified: pig/trunk/test/org/apache/pig/test/TestPigServerLocal.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigServerLocal.java?rev=1706711&r1=1706710&r2=1706711&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPigServerLocal.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPigServerLocal.java Sun Oct 4 18:22:32 2015
@@ -50,8 +50,10 @@ import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.PropertiesUtil;
+import org.apache.pig.scripting.ScriptEngine;
import org.apache.pig.tools.grunt.Grunt;
import org.apache.pig.tools.grunt.GruntParser;
+import org.apache.pig.tools.pigstats.PigStats;
import org.junit.Before;
import org.junit.Test;
@@ -249,21 +251,23 @@ public class TestPigServerLocal {
@Test
public void testSkipParseInRegisterForBatch() throws Throwable {
- // numTimesInitiated = 10. 4 (once per registerQuery) + 6 (launchPlan->RandomSampleLoader,
- // InputSizeReducerEstimator, getSplits->RandomSampleLoader,
- // createRecordReader->RandomSampleLoader, getSplits, createRecordReader)
- // numTimesSchemaCalled = 4 (once per registerQuery)
if (Util.getLocalTestMode().toString().startsWith("TEZ")) {
_testSkipParseInRegisterForBatch(false, 8, 4);
_testSkipParseInRegisterForBatch(true, 5, 1);
+ _testParseBatchWithScripting(5, 1);
} else {
+ // numTimesInitiated = 10. 4 (once per registerQuery) + 6 (launchPlan->RandomSampleLoader,
+ // InputSizeReducerEstimator, getSplits->RandomSampleLoader,
+ // createRecordReader->RandomSampleLoader, getSplits, createRecordReader)
+ // numTimesSchemaCalled = 4 (once per registerQuery)
_testSkipParseInRegisterForBatch(false, 10, 4);
+ // numTimesInitiated = 7 (parseAndBuild, launchPlan->RandomSampleLoader,
+ // InputSizeReducerEstimator, getSplits->RandomSampleLoader,
+ // createRecordReader->RandomSampleLoader, getSplits, createRecordReader)
+ // numTimesSchemaCalled = 1 (parseAndBuild)
_testSkipParseInRegisterForBatch(true, 7, 1);
+ _testParseBatchWithScripting(7, 1);
}
- // numTimesInitiated = 7 (parseAndBuild, launchPlan->RandomSampleLoader,
- // InputSizeReducerEstimator, getSplits->RandomSampleLoader,
- // createRecordReader->RandomSampleLoader, getSplits, createRecordReader)
- // numTimesSchemaCalled = 1 (parseAndBuild)
}
@Test
@@ -322,6 +326,53 @@ public class TestPigServerLocal {
}
assertEquals(numTimesInitiated, MockTrackingStorage.numTimesInitiated);
+ assertEquals(numTimesSchemaCalled, MockTrackingStorage.numTimesSchemaCalled);
+ List<Tuple> out = data.get("bar");
+ assertEquals(2, out.size());
+ assertEquals(tuple("a", 1, "b"), out.get(0));
+ assertEquals(tuple("b", 2, "c"), out.get(1));
+ }
+
+ private void _testParseBatchWithScripting(int numTimesInitiated, int numTimesSchemaCalled) throws Throwable {
+ MockTrackingStorage.numTimesInitiated = 0;
+ MockTrackingStorage.numTimesSchemaCalled = 0;
+
+ String[] script = {
+ "#!/usr/bin/python",
+ "from org.apache.pig.scripting import *",
+ "P = Pig.compile(\"\"\"" +
+ "A = load 'foo' USING org.apache.pig.test.TestPigServerLocal\\$MockTrackingStorage();" +
+ "B = order A by $0,$1,$2;" +
+ "C = LIMIT B 2;" +
+ "store C into 'bar' USING mock.Storage();" +
+ "\"\"\")",
+ "Q = P.bind()",
+ "stats = Q.runSingle()",
+ "if stats.isSuccessful():",
+ "\tprint 'success!'",
+ "else:",
+ "\traise 'failed'"
+ };
+
+ Properties properties = new Properties();
+ properties.setProperty("io.sort.mb", "2");
+ PigContext pigContext = new PigContext(Util.getLocalTestMode(), properties);
+ PigServer pigServer = new PigServer(pigContext);
+ Data data = resetData(pigContext);
+ data.set("foo", tuple("a", 1, "b"), tuple("b", 2, "c"), tuple("c", 3, "d"));
+
+ String scriptFile = tempDir + File.separator + "_testParseBatchWithScripting.py";
+ Util.createLocalInputFile(scriptFile , script);
+ ScriptEngine scriptEngine = ScriptEngine.getInstance("jython");
+ Map<String, List<PigStats>> statsMap = scriptEngine.run(pigServer.getPigContext(), scriptFile);
+
+ for (List<PigStats> stats : statsMap.values()) {
+ for (PigStats s : stats) {
+ assertTrue(s.isSuccessful());
+ }
+ }
+
+ assertEquals(numTimesInitiated, MockTrackingStorage.numTimesInitiated);
assertEquals(numTimesSchemaCalled, MockTrackingStorage.numTimesSchemaCalled);
List<Tuple> out = data.get("bar");
assertEquals(2, out.size());