You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2014/04/15 19:18:42 UTC

svn commit: r1587637 - in /pig/trunk: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java test/org/apache/pig/test/TestFetch.java

Author: cheolsoo
Date: Tue Apr 15 17:18:41 2014
New Revision: 1587637

URL: http://svn.apache.org/r1587637
Log:
PIG-3889: Direct fetch doesn't set job submission timestamps (cheolsoo)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java
    pig/trunk/test/org/apache/pig/test/TestFetch.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1587637&r1=1587636&r2=1587637&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Apr 15 17:18:41 2014
@@ -107,6 +107,8 @@ PIG-3882: Multiquery off mode execution 
  
 BUG FIXES
 
+PIG-3889: Direct fetch doesn't set job submission timestamps (cheolsoo)
+
 PIG-3895: Pigmix run script has compilation error (rohini)
 
 PIG-3885: AccumuloStorage incompatible with Accumulo 1.6.0 (elserj via daijy)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java?rev=1587637&r1=1587636&r2=1587637&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java Tue Apr 15 17:18:41 2014
@@ -78,11 +78,11 @@ public class FetchLauncher {
 
             // run fetch
             runPipeline(poStore);
-            
+
             UDFFinishVisitor udfFinisher = new UDFFinishVisitor(pp,
                     new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(pp));
             udfFinisher.visit();
-            
+
             return PigStats.start(new EmptyPigStats(pigContext, poStore));
         }
         finally {
@@ -122,11 +122,15 @@ public class FetchLauncher {
 
         TaskAttemptID taskAttemptID = HadoopShims.getNewTaskAttemptID();
         HadoopShims.setTaskAttemptId(conf, taskAttemptID);
-        
+
         if (!PlanHelper.getPhysicalOperators(pp, POStream.class).isEmpty()) {
             MapRedUtil.setupStreamingDirsConfSingle(poStore, pigContext, conf);
         }
 
+        String currentTime = Long.toString(System.currentTimeMillis());
+        conf.set("pig.script.submitted.timestamp", currentTime);
+        conf.set("pig.job.submitted.timestamp", currentTime);
+
         PhysicalOperator.setReporter(new FetchProgressableReporter());
         SchemaTupleBackend.initialize(conf, pigContext);
 
@@ -136,13 +140,13 @@ public class FetchLauncher {
         udfContext.serialize(conf);
 
         PigMapReduce.sJobConfInternal.set(conf);
-        String dtzStr = PigMapReduce.sJobConfInternal.get().get("pig.datetime.default.tz");
+        String dtzStr = conf.get("pig.datetime.default.tz");
         if (dtzStr != null && dtzStr.length() > 0) {
             // ensure that the internal timezone is uniformly in UTC offset style
             DateTimeZone.setDefault(DateTimeZone.forOffsetMillis(DateTimeZone.forID(dtzStr).getOffset(null)));
         }
-        
-        boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
+
+        boolean aggregateWarning = "true".equalsIgnoreCase(conf.get("aggregate.warning"));
         PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
         pigHadoopLogger.setAggregate(aggregateWarning);
         PigStatusReporter.getInstance().setFetchContext(new FetchContext());

Modified: pig/trunk/test/org/apache/pig/test/TestFetch.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestFetch.java?rev=1587637&r1=1587636&r2=1587637&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestFetch.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestFetch.java Tue Apr 15 17:18:41 2014
@@ -18,14 +18,18 @@
 
 package org.apache.pig.test;
 
+import static org.apache.pig.builtin.mock.Storage.resetData;
+import static org.apache.pig.builtin.mock.Storage.tuple;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.PrintStream;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Properties;
 import java.util.Random;
 
@@ -39,17 +43,21 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.builtin.mock.Storage.Data;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.newplan.logical.relational.LogicalPlan;
 import org.apache.pig.parser.ParserTestingUtils;
 import org.apache.pig.test.utils.GenPhyOp;
+import org.joda.time.DateTime;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import com.google.common.collect.Lists;
+
 public class TestFetch {
 
     private PigServer pigServer;
@@ -87,6 +95,8 @@ public class TestFetch {
     @Before
     public void setUp() throws Exception{
         pigServer = new PigServer(ExecType.LOCAL, new Properties());
+        // force direct fetch mode
+        pigServer.getPigContext().getProperties().setProperty(PigConfiguration.OPT_FETCH, "true");
     }
 
     @Test
@@ -279,6 +289,26 @@ public class TestFetch {
 
     }
 
+    /**
+     * Tests whether 'pig.job.submitted.timestamp' has been set by FetchLauncher
+     * @throws Exception
+     */
+    @Test
+    public void test7() throws Exception {
+        Data data = resetData(pigServer);
+
+        List<Tuple> justSomeRows = Lists.newArrayListWithCapacity(1);
+        justSomeRows.add(tuple(1));
+        data.set("justSomeRows", justSomeRows);
+
+        pigServer.registerQuery("A = load 'justSomeRows' using mock.Storage();");
+        pigServer.registerQuery("B = foreach A generate CurrentTime();");
+        Iterator<Tuple> it = pigServer.openIterator("B");
+        DateTime received = (DateTime) it.next().get(0);
+        // any returned result indicates that the property was set correctly
+        assertNotNull(received);
+    }
+
     @AfterClass
     public static void tearDownOnce() throws Exception {
         inputFile1.delete();