You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2016/02/16 19:19:09 UTC
svn commit: r1730727 - in /pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/
test/org/apache/pig/test/
Author: rohini
Date: Tue Feb 16 18:19:09 2016
New Revision: 1730727
URL: http://svn.apache.org/viewvc?rev=1730727&view=rev
Log:
PIG-4806: UDFContext can be reset in the middle during Tez input and output initialization (rohini)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/JVMReuseImpl.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
pig/trunk/test/org/apache/pig/test/TestUDFContext.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1730727&r1=1730726&r2=1730727&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Feb 16 18:19:09 2016
@@ -93,6 +93,8 @@ PIG-4639: Add better parser for Apache H
BUG FIXES
+PIG-4806: UDFContext can be reset in the middle during Tez input and output initialization (rohini)
+
PIG-4808: PluckTuple overwrites regex if used more than once in the same script (eyal via daijy)
PIG-4801: Provide backward compatibility with mapreduce mapred.task settings (rohini)
Modified: pig/trunk/src/org/apache/pig/JVMReuseImpl.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/JVMReuseImpl.java?rev=1730727&r1=1730726&r2=1730727&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/JVMReuseImpl.java (original)
+++ pig/trunk/src/org/apache/pig/JVMReuseImpl.java Tue Feb 16 18:19:09 2016
@@ -29,7 +29,6 @@ import org.apache.pig.classification.Int
import org.apache.pig.classification.InterfaceStability;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.util.SpillableMemoryManager;
-import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.tools.pigstats.PigStatusReporter;
@InterfaceAudience.Private
@@ -45,7 +44,6 @@ public class JVMReuseImpl {
SpillableMemoryManager.staticDataCleanup();
PhysicalOperator.staticDataCleanup();
PigContext.staticDataCleanup();
- UDFContext.staticDataCleanup();
PigGenericMapReduce.staticDataCleanup();
PigStatusReporter.staticDataCleanup();
PigCombiner.Combine.staticDataCleanup();
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java?rev=1730727&r1=1730726&r2=1730727&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java Tue Feb 16 18:19:09 2016
@@ -42,6 +42,7 @@ import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.tools.pigstats.PigStatusReporter;
public class PigCombiner {
@@ -93,6 +94,7 @@ public class PigCombiner {
pigContext = (PigContext)ObjectSerializer.deserialize(jConf.get("pig.pigContext"));
if (pigContext.getLog4jProperties()!=null)
PropertyConfigurator.configure(pigContext.getLog4jProperties());
+ UDFContext.getUDFContext().reset();
MapRedUtil.setupUDFContext(context.getConfiguration());
cp = (PhysicalPlan) ObjectSerializer.deserialize(jConf
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java?rev=1730727&r1=1730726&r2=1730727&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java Tue Feb 16 18:19:09 2016
@@ -198,6 +198,8 @@ public class PigProcessor extends Abstra
// The Reporter and Context objects hold TezProcessorContextImpl
// which holds input and its sort buffers which are huge.
new JVMReuseImpl().cleanupStaticData();
+ // Do only in close() and not initialize().
+ UDFContext.staticDataCleanup();
}
@Override
Modified: pig/trunk/test/org/apache/pig/test/TestUDFContext.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestUDFContext.java?rev=1730727&r1=1730726&r2=1730727&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestUDFContext.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestUDFContext.java Tue Feb 16 18:19:09 2016
@@ -42,6 +42,8 @@ import org.apache.pig.impl.util.UDFConte
import org.junit.AfterClass;
import org.junit.Test;
+import com.google.common.io.Files;
+
public class TestUDFContext {
@AfterClass
@@ -124,7 +126,10 @@ public class TestUDFContext {
Storage.Data data = resetData(pigServer);
String inputPath = Util.encodeEscape(inputFile.getAbsolutePath());
- String query = "A = LOAD '" + inputPath + "' USING PigStorage();"
+ File file = Files.createTempDir();
+ file.deleteOnExit();
+ String outputFile = Util.generateURI(file.getPath() + File.pathSeparator + "out", pigServer.getPigContext());
+ String query = "A = LOAD '" + inputPath + "' USING PigStorage() as (f1, f2, f3, f4, f5);"
+ "B = LOAD '" + inputPath + "' USING PigStorage();"
+ "B = FOREACH B GENERATE $0, $1;"
+ "C = LOAD '" + inputPath + "' USING " + FieldsByIndexLoader.class.getName() + "('1,2');"
@@ -132,7 +137,8 @@ public class TestUDFContext {
+ "C = FOREACH C GENERATE *, B.$0;"
+ "STORE A INTO 'A' USING mock.Storage();"
+ "STORE B INTO 'B' USING mock.Storage();"
- + "STORE C INTO 'C' USING mock.Storage();";
+ + "STORE C INTO 'C' USING mock.Storage();"
+ + "STORE A INTO '" + outputFile + "' USING " + FieldsByIndexLoader.class.getName() + "();";
pigServer.registerQuery(query);
@@ -161,6 +167,10 @@ public class TestUDFContext {
private boolean frontend = false;
private Properties props = UDFContext.getUDFContext().getUDFProperties(this.getClass());
private boolean[] selectedFields = new boolean[5]; //Assuming data always has 5 columns
+ private String storeSignature;
+
+ public FieldsByIndexLoader() {
+ }
public FieldsByIndexLoader(String fieldIndices) {
String[] requiredFields = fieldIndices.split(",");
@@ -186,6 +196,32 @@ public class TestUDFContext {
return super.getSchema(location, job);
}
+ @Override
+ public void checkSchema(ResourceSchema s) throws IOException {
+ super.checkSchema(s);
+ UDFContext udfContext = UDFContext.getUDFContext();
+ Properties props = udfContext.getUDFProperties(this.getClass(), new String[]{storeSignature});
+ props.setProperty("testkey", "testvalue");
+ }
+
+ @Override
+ public void setStoreFuncUDFContextSignature(String signature) {
+ this.storeSignature = signature;
+ }
+
+ @Override
+ public void setStoreLocation(String location, Job job)
+ throws IOException {
+ if (!UDFContext.getUDFContext().isFrontend()) {
+ Properties udfProps = UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[]{storeSignature});
+ if (!("testvalue").equals(udfProps.getProperty("testkey"))) {
+ throw new IOException("UDFContext does not have expected values");
+ }
+ }
+ super.setStoreLocation(location, job);
+ }
+
+
}
}