You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2014/04/15 19:18:42 UTC
svn commit: r1587637 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java
test/org/apache/pig/test/TestFetch.java
Author: cheolsoo
Date: Tue Apr 15 17:18:41 2014
New Revision: 1587637
URL: http://svn.apache.org/r1587637
Log:
PIG-3889: Direct fetch doesn't set job submission timestamps (cheolsoo)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java
pig/trunk/test/org/apache/pig/test/TestFetch.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1587637&r1=1587636&r2=1587637&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Apr 15 17:18:41 2014
@@ -107,6 +107,8 @@ PIG-3882: Multiquery off mode execution
BUG FIXES
+PIG-3889: Direct fetch doesn't set job submission timestamps (cheolsoo)
+
PIG-3895: Pigmix run script has compilation error (rohini)
PIG-3885: AccumuloStorage incompatible with Accumulo 1.6.0 (elserj via daijy)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java?rev=1587637&r1=1587636&r2=1587637&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java Tue Apr 15 17:18:41 2014
@@ -78,11 +78,11 @@ public class FetchLauncher {
// run fetch
runPipeline(poStore);
-
+
UDFFinishVisitor udfFinisher = new UDFFinishVisitor(pp,
new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(pp));
udfFinisher.visit();
-
+
return PigStats.start(new EmptyPigStats(pigContext, poStore));
}
finally {
@@ -122,11 +122,15 @@ public class FetchLauncher {
TaskAttemptID taskAttemptID = HadoopShims.getNewTaskAttemptID();
HadoopShims.setTaskAttemptId(conf, taskAttemptID);
-
+
if (!PlanHelper.getPhysicalOperators(pp, POStream.class).isEmpty()) {
MapRedUtil.setupStreamingDirsConfSingle(poStore, pigContext, conf);
}
+ String currentTime = Long.toString(System.currentTimeMillis());
+ conf.set("pig.script.submitted.timestamp", currentTime);
+ conf.set("pig.job.submitted.timestamp", currentTime);
+
PhysicalOperator.setReporter(new FetchProgressableReporter());
SchemaTupleBackend.initialize(conf, pigContext);
@@ -136,13 +140,13 @@ public class FetchLauncher {
udfContext.serialize(conf);
PigMapReduce.sJobConfInternal.set(conf);
- String dtzStr = PigMapReduce.sJobConfInternal.get().get("pig.datetime.default.tz");
+ String dtzStr = conf.get("pig.datetime.default.tz");
if (dtzStr != null && dtzStr.length() > 0) {
// ensure that the internal timezone is uniformly in UTC offset style
DateTimeZone.setDefault(DateTimeZone.forOffsetMillis(DateTimeZone.forID(dtzStr).getOffset(null)));
}
-
- boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
+
+ boolean aggregateWarning = "true".equalsIgnoreCase(conf.get("aggregate.warning"));
PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
pigHadoopLogger.setAggregate(aggregateWarning);
PigStatusReporter.getInstance().setFetchContext(new FetchContext());
Modified: pig/trunk/test/org/apache/pig/test/TestFetch.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestFetch.java?rev=1587637&r1=1587636&r2=1587637&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestFetch.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestFetch.java Tue Apr 15 17:18:41 2014
@@ -18,14 +18,18 @@
package org.apache.pig.test;
+import static org.apache.pig.builtin.mock.Storage.resetData;
+import static org.apache.pig.builtin.mock.Storage.tuple;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.PrintStream;
import java.util.Iterator;
+import java.util.List;
import java.util.Properties;
import java.util.Random;
@@ -39,17 +43,21 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.builtin.mock.Storage.Data;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
import org.apache.pig.parser.ParserTestingUtils;
import org.apache.pig.test.utils.GenPhyOp;
+import org.joda.time.DateTime;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import com.google.common.collect.Lists;
+
public class TestFetch {
private PigServer pigServer;
@@ -87,6 +95,8 @@ public class TestFetch {
@Before
public void setUp() throws Exception{
pigServer = new PigServer(ExecType.LOCAL, new Properties());
+ // force direct fetch mode
+ pigServer.getPigContext().getProperties().setProperty(PigConfiguration.OPT_FETCH, "true");
}
@Test
@@ -279,6 +289,26 @@ public class TestFetch {
}
+ /**
+ * Tests whether 'pig.job.submitted.timestamp' has been set by FetchLauncher
+ * @throws Exception
+ */
+ @Test
+ public void test7() throws Exception {
+ Data data = resetData(pigServer);
+
+ List<Tuple> justSomeRows = Lists.newArrayListWithCapacity(1);
+ justSomeRows.add(tuple(1));
+ data.set("justSomeRows", justSomeRows);
+
+ pigServer.registerQuery("A = load 'justSomeRows' using mock.Storage();");
+ pigServer.registerQuery("B = foreach A generate CurrentTime();");
+ Iterator<Tuple> it = pigServer.openIterator("B");
+ DateTime received = (DateTime) it.next().get(0);
+ // any returned result indicates that the property was set correctly
+ assertNotNull(received);
+ }
+
@AfterClass
public static void tearDownOnce() throws Exception {
inputFile1.delete();