You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2016/02/16 19:19:09 UTC

svn commit: r1730727 - in /pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/ test/org/apache/pig/test/

Author: rohini
Date: Tue Feb 16 18:19:09 2016
New Revision: 1730727

URL: http://svn.apache.org/viewvc?rev=1730727&view=rev
Log:
PIG-4806: UDFContext can be reset in the middle during Tez input and output initialization (rohini)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/JVMReuseImpl.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
    pig/trunk/test/org/apache/pig/test/TestUDFContext.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1730727&r1=1730726&r2=1730727&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Feb 16 18:19:09 2016
@@ -93,6 +93,8 @@ PIG-4639: Add better parser for Apache H
 
 BUG FIXES
 
+PIG-4806: UDFContext can be reset in the middle during Tez input and output initialization (rohini)
+
 PIG-4808: PluckTuple overwrites regex if used more than once in the same script (eyal via daijy)
 
 PIG-4801: Provide backward compatibility with mapreduce mapred.task settings (rohini)

Modified: pig/trunk/src/org/apache/pig/JVMReuseImpl.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/JVMReuseImpl.java?rev=1730727&r1=1730726&r2=1730727&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/JVMReuseImpl.java (original)
+++ pig/trunk/src/org/apache/pig/JVMReuseImpl.java Tue Feb 16 18:19:09 2016
@@ -29,7 +29,6 @@ import org.apache.pig.classification.Int
 import org.apache.pig.classification.InterfaceStability;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.util.SpillableMemoryManager;
-import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.tools.pigstats.PigStatusReporter;
 
 @InterfaceAudience.Private
@@ -45,7 +44,6 @@ public class JVMReuseImpl {
         SpillableMemoryManager.staticDataCleanup();
         PhysicalOperator.staticDataCleanup();
         PigContext.staticDataCleanup();
-        UDFContext.staticDataCleanup();
         PigGenericMapReduce.staticDataCleanup();
         PigStatusReporter.staticDataCleanup();
         PigCombiner.Combine.staticDataCleanup();

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java?rev=1730727&r1=1730726&r2=1730727&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java Tue Feb 16 18:19:09 2016
@@ -42,6 +42,7 @@ import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.tools.pigstats.PigStatusReporter;
 
 public class PigCombiner {
@@ -93,6 +94,7 @@ public class PigCombiner {
                 pigContext = (PigContext)ObjectSerializer.deserialize(jConf.get("pig.pigContext"));
                 if (pigContext.getLog4jProperties()!=null)
                     PropertyConfigurator.configure(pigContext.getLog4jProperties());
+                UDFContext.getUDFContext().reset();
                 MapRedUtil.setupUDFContext(context.getConfiguration());
 
                 cp = (PhysicalPlan) ObjectSerializer.deserialize(jConf

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java?rev=1730727&r1=1730726&r2=1730727&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java Tue Feb 16 18:19:09 2016
@@ -198,6 +198,8 @@ public class PigProcessor extends Abstra
         // The Reporter and Context objects hold TezProcessorContextImpl
         // which holds input and its sort buffers which are huge.
         new JVMReuseImpl().cleanupStaticData();
+        // Do only in close() and not initialize().
+        UDFContext.staticDataCleanup();
     }
 
     @Override

Modified: pig/trunk/test/org/apache/pig/test/TestUDFContext.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestUDFContext.java?rev=1730727&r1=1730726&r2=1730727&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestUDFContext.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestUDFContext.java Tue Feb 16 18:19:09 2016
@@ -42,6 +42,8 @@ import org.apache.pig.impl.util.UDFConte
 import org.junit.AfterClass;
 import org.junit.Test;
 
+import com.google.common.io.Files;
+
 public class TestUDFContext {
 
     @AfterClass
@@ -124,7 +126,10 @@ public class TestUDFContext {
         Storage.Data data = resetData(pigServer);
 
         String inputPath = Util.encodeEscape(inputFile.getAbsolutePath());
-        String query = "A = LOAD '" + inputPath + "' USING PigStorage();"
+        File file = Files.createTempDir();
+        file.deleteOnExit();
+        String outputFile = Util.generateURI(file.getPath() + File.pathSeparator + "out", pigServer.getPigContext());
+        String query = "A = LOAD '" + inputPath + "' USING PigStorage() as (f1, f2, f3, f4, f5);"
                 + "B = LOAD '" + inputPath + "' USING PigStorage();"
                 + "B = FOREACH B GENERATE $0, $1;"
                 + "C = LOAD '" + inputPath + "' USING " + FieldsByIndexLoader.class.getName() + "('1,2');"
@@ -132,7 +137,8 @@ public class TestUDFContext {
                 + "C = FOREACH C GENERATE *, B.$0;"
                 + "STORE A INTO 'A' USING mock.Storage();"
                 + "STORE B INTO 'B' USING mock.Storage();"
-                + "STORE C INTO 'C' USING mock.Storage();";
+                + "STORE C INTO 'C' USING mock.Storage();"
+                + "STORE A INTO '" + outputFile + "' USING " + FieldsByIndexLoader.class.getName() + "();";
 
         pigServer.registerQuery(query);
 
@@ -161,6 +167,10 @@ public class TestUDFContext {
         private boolean frontend = false;
         private Properties props = UDFContext.getUDFContext().getUDFProperties(this.getClass());
         private boolean[] selectedFields = new boolean[5]; //Assuming data always has 5 columns
+        private String storeSignature;
+
+        public FieldsByIndexLoader() {
+        }
 
         public FieldsByIndexLoader(String fieldIndices) {
             String[] requiredFields = fieldIndices.split(",");
@@ -186,6 +196,32 @@ public class TestUDFContext {
             return super.getSchema(location, job);
         }
 
+        @Override
+        public void checkSchema(ResourceSchema s) throws IOException {
+            super.checkSchema(s);
+            UDFContext udfContext = UDFContext.getUDFContext();
+            Properties props = udfContext.getUDFProperties(this.getClass(), new String[]{storeSignature});
+            props.setProperty("testkey", "testvalue");
+        }
+
+        @Override
+        public void setStoreFuncUDFContextSignature(String signature) {
+            this.storeSignature = signature;
+        }
+
+        @Override
+        public void setStoreLocation(String location, Job job)
+                throws IOException {
+            if (!UDFContext.getUDFContext().isFrontend()) {
+                Properties udfProps = UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[]{storeSignature});
+                if (!("testvalue").equals(udfProps.getProperty("testkey"))) {
+                    throw new IOException("UDFContext does not have expected values");
+                }
+            }
+            super.setStoreLocation(location, job);
+        }
+
+
     }
 
 }