You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/11/09 20:44:02 UTC
svn commit: r1637725 - in /pig/branches/branch-0.14: ./
src/org/apache/pig/backend/hadoop/executionengine/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/tez/
src/org/apache/pig/tools...
Author: rohini
Date: Sun Nov 9 19:44:01 2014
New Revision: 1637725
URL: http://svn.apache.org/r1637725
Log:
PIG-4316: Port TestHBaseStorage to tez local mode (rohini)
Modified:
pig/branches/branch-0.14/CHANGES.txt
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
pig/branches/branch-0.14/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java
pig/branches/branch-0.14/test/org/apache/pig/test/TestHBaseStorage.java
pig/branches/branch-0.14/test/tez-local-tests
Modified: pig/branches/branch-0.14/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/CHANGES.txt?rev=1637725&r1=1637724&r2=1637725&view=diff
==============================================================================
--- pig/branches/branch-0.14/CHANGES.txt (original)
+++ pig/branches/branch-0.14/CHANGES.txt Sun Nov 9 19:44:01 2014
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-4316: Port TestHBaseStorage to tez local mode (rohini)
+
PIG-4224: Upload Tez payload history string to timeline server (daijy)
PIG-3977: Get TezStats working for Oozie (rohini)
Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java?rev=1637725&r1=1637724&r2=1637725&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java Sun Nov 9 19:44:01 2014
@@ -279,14 +279,18 @@ public abstract class Launcher {
public class JobControlThreadExceptionHandler implements Thread.UncaughtExceptionHandler {
@Override
public void uncaughtException(Thread thread, Throwable throwable) {
- jobControlExceptionStackTrace = Utils.getStackStraceStr(throwable);
- try {
- jobControlException = getExceptionFromString(jobControlExceptionStackTrace);
- } catch (Exception e) {
- String errMsg = "Could not resolve error that occured when launching job: "
- + jobControlExceptionStackTrace;
- jobControlException = new RuntimeException(errMsg, throwable);
- }
+ setJobException(throwable);
+ }
+ }
+
+ protected void setJobException(Throwable throwable) {
+ jobControlExceptionStackTrace = Utils.getStackStraceStr(throwable);
+ try {
+ jobControlException = getExceptionFromString(jobControlExceptionStackTrace);
+ } catch (Exception e) {
+ String errMsg = "Could not resolve error that occured when launching job: "
+ + jobControlExceptionStackTrace;
+ jobControlException = new RuntimeException(errMsg, throwable);
}
}
Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java?rev=1637725&r1=1637724&r2=1637725&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java Sun Nov 9 19:44:01 2014
@@ -17,6 +17,8 @@
*/
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+import java.io.IOException;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -26,8 +28,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.impl.util.UriUtil;
-import java.io.IOException;
-
/**
* Class that computes the size of output for file-based systems.
*/
@@ -43,19 +43,23 @@ public class FileBasedOutputSizeReader i
*/
@Override
public boolean supports(POStore sto, Configuration conf) {
- String storeFuncName = sto.getStoreFunc().getClass().getCanonicalName();
- // Some store functions do not support file-based output reader (e.g.
- // HCatStorer), so they should be excluded.
- String unsupported = conf.get(
- PigStatsOutputSizeReader.OUTPUT_SIZE_READER_UNSUPPORTED);
- if (unsupported != null) {
- for (String s : unsupported.split(",")) {
- if (s.equalsIgnoreCase(storeFuncName)) {
- return false;
+ boolean nullOrSupportedScheme = UriUtil.isHDFSFileOrLocalOrS3N(getLocationUri(sto), conf);
+ if (nullOrSupportedScheme) {
+ // Some store functions that do not have scheme
+ // do not support file-based output reader (e.g.HCatStorer),
+ // so they should be excluded.
+ String unsupported = conf.get(
+ PigStatsOutputSizeReader.OUTPUT_SIZE_READER_UNSUPPORTED);
+ if (unsupported != null) {
+ String storeFuncName = sto.getStoreFunc().getClass().getCanonicalName();
+ for (String s : unsupported.split(",")) {
+ if (s.equalsIgnoreCase(storeFuncName)) {
+ return false;
+ }
}
}
}
- return UriUtil.isHDFSFileOrLocalOrS3N(getLocationUri(sto), conf);
+ return nullOrSupportedScheme;
}
/**
Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=1637725&r1=1637724&r2=1637725&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java Sun Nov 9 19:44:01 2014
@@ -93,6 +93,7 @@ public class PigInputFormat extends Inpu
Configuration conf = context.getConfiguration();
PigContext.setPackageImportList((ArrayList<String>) ObjectSerializer
.deserialize(conf.get("udf.import.list")));
+ MapRedUtil.setupUDFContext(conf);
LoadFunc loadFunc = getLoadFunc(pigSplit.getInputIndex(), conf);
// Pass loader signature to LoadFunc and to InputFormat through
// the conf
Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java?rev=1637725&r1=1637724&r2=1637725&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java Sun Nov 9 19:44:01 2014
@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.pig.PigConfiguration;
+import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.tools.pigstats.tez.TezPigScriptStats;
import org.apache.tez.client.TezClient;
import org.apache.tez.common.counters.TezCounters;
@@ -153,6 +154,7 @@ public class TezJob implements Runnable
@Override
public void run() {
+ UDFContext udfContext = UDFContext.getUDFContext();
try {
tezClient = TezSessionManager.getClient(conf, requestAMResources,
dag.getCredentials(), tezJobConf);
@@ -182,12 +184,15 @@ public class TezJob implements Runnable
}
if (dagStatus.isCompleted()) {
+ // For tez_local mode where PigProcessor destroys all UDFContext
+ UDFContext.setUdfContext(udfContext);
+
log.info("DAG Status: " + dagStatus);
dagCounters = dagStatus.getDAGCounters();
TezSessionManager.freeSession(tezClient);
try {
pigStats.accumulateStats(this);
- } catch (IOException e) {
+ } catch (Exception e) {
log.warn("Exception while gathering stats", e);
}
try {
Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1637725&r1=1637724&r2=1637725&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java Sun Nov 9 19:44:01 2014
@@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -34,6 +35,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.pig.PigConfiguration;
+import org.apache.pig.PigException;
import org.apache.pig.PigWarning;
import org.apache.pig.backend.BackendException;
import org.apache.pig.backend.executionengine.ExecException;
@@ -62,6 +64,7 @@ import org.apache.pig.impl.plan.Compilat
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.LogUtils;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.tools.pigstats.OutputStats;
import org.apache.pig.tools.pigstats.PigStats;
@@ -90,8 +93,10 @@ public class TezLauncher extends Launche
public TezLauncher() {
if (namedThreadFactory == null) {
- namedThreadFactory = new ThreadFactoryBuilder().setNameFormat(
- "PigTezLauncher-%d").build();
+ namedThreadFactory = new ThreadFactoryBuilder()
+ .setNameFormat("PigTezLauncher-%d")
+ .setUncaughtExceptionHandler(new JobControlThreadExceptionHandler())
+ .build();
}
executor = Executors.newSingleThreadExecutor(namedThreadFactory);
}
@@ -154,18 +159,15 @@ public class TezLauncher extends Launche
// Set the thread UDFContext so registered classes are available.
final UDFContext udfContext = UDFContext.getUDFContext();
- Thread task = new Thread(runningJob) {
+ Runnable task = new Runnable() {
@Override
public void run() {
+ Thread.currentThread().setContextClassLoader(PigContext.getClassLoader());
UDFContext.setUdfContext(udfContext.clone());
- super.run();
+ runningJob.run();
}
};
- JobControlThreadExceptionHandler jctExceptionHandler = new JobControlThreadExceptionHandler();
- task.setUncaughtExceptionHandler(jctExceptionHandler);
- task.setContextClassLoader(PigContext.getClassLoader());
-
// Mark the times that the jobs were submitted so it's reflected in job
// history props. TODO: Fix this. unused now
long scriptSubmittedTimestamp = System.currentTimeMillis();
@@ -193,6 +195,15 @@ public class TezLauncher extends Launche
reporter.notifyUpdate();
Thread.sleep(1000);
}
+ // For tez_local mode where PigProcessor destroys all UDFContext
+ UDFContext.setUdfContext(udfContext);
+ try {
+ // In case of FutureTask there is no uncaught exception
+ // Need to do future.get() to get any exception
+ future.get();
+ } catch (ExecutionException e) {
+ setJobException(e.getCause());
+ }
}
processedDAGs++;
if (tezPlanContainer.size() == processedDAGs) {
@@ -201,6 +212,7 @@ public class TezLauncher extends Launche
tezScriptState.emitProgressUpdatedNotification(
((tezPlanContainer.size() - processedDAGs)/tezPlanContainer.size()) * 100);
}
+ handleUnCaughtException(pc);
tezPlanContainer.updatePlan(tezPlan, reporter.notifyFinishedOrFailed());
}
@@ -231,6 +243,28 @@ public class TezLauncher extends Launche
return tezStats;
}
+ private void handleUnCaughtException(PigContext pc) throws Exception {
+ //check for the uncaught exceptions from TezJob thread
+ //if the job controller fails before launching the jobs then there are
+ //no jobs to check for failure
+ if (jobControlException != null) {
+ if (jobControlException instanceof PigException) {
+ if (jobControlExceptionStackTrace != null) {
+ LogUtils.writeLog("Error message from Tez Job",
+ jobControlExceptionStackTrace, pc
+ .getProperties().getProperty(
+ "pig.logfile"), log);
+ }
+ throw jobControlException;
+ } else {
+ int errCode = 2117;
+ String msg = "Unexpected error when launching Tez job.";
+ throw new ExecException(msg, errCode, PigException.BUG,
+ jobControlException);
+ }
+ }
+ }
+
private void computeWarningAggregate(Map<String, Map<String, Long>> counterGroups, Map<Enum, Long> aggMap) {
for (Map<String, Long> counters : counterGroups.values()) {
for (Enum e : PigWarning.values()) {
Modified: pig/branches/branch-0.14/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java?rev=1637725&r1=1637724&r2=1637725&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java Sun Nov 9 19:44:01 2014
@@ -249,7 +249,11 @@ public class TezVertexStats extends JobS
counters.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_WRITTEN) != null) {
hdfsBytesWritten = counters.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_WRITTEN);
} else {
- hdfsBytesWritten = JobStats.getOutputSize(sto, conf);
+ try {
+ hdfsBytesWritten = JobStats.getOutputSize(sto, conf);
+ } catch (Exception e) {
+ LOG.warn("Error while getting the bytes written for the output " + sto.getSFile(), e);
+ }
}
OutputStats os = new OutputStats(filename, hdfsBytesWritten,
Modified: pig/branches/branch-0.14/test/org/apache/pig/test/TestHBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/test/org/apache/pig/test/TestHBaseStorage.java?rev=1637725&r1=1637724&r2=1637725&view=diff
==============================================================================
--- pig/branches/branch-0.14/test/org/apache/pig/test/TestHBaseStorage.java (original)
+++ pig/branches/branch-0.14/test/org/apache/pig/test/TestHBaseStorage.java Sun Nov 9 19:44:01 2014
@@ -49,6 +49,7 @@ import org.apache.pig.data.Tuple;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
+import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -101,7 +102,7 @@ public class TestHBaseStorage {
@Before
public void beforeTest() throws Exception {
- pig = new PigServer(ExecType.LOCAL, conf);
+ pig = new PigServer(Util.getLocalTestMode(), conf);
}
@After
@@ -125,6 +126,7 @@ public class TestHBaseStorage {
deletes.add(new Delete(row.getRow()));
}
table.delete(deletes);
+ table.close();
}
/**
@@ -825,7 +827,7 @@ public class TestHBaseStorage {
Assert.assertEquals(100, index);
LOG.info("testLoadWithProjection_2 done");
}
-
+
/**
* Test merge inner join with two tables
*
@@ -833,6 +835,7 @@ public class TestHBaseStorage {
*/
@Test
public void testMergeJoin() throws IOException {
+ Assume.assumeTrue("Skip this test for TEZ. See PIG-4315", pig.getPigContext().getExecType().equals(ExecType.LOCAL));
prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary);
prepareTable(TESTTABLE_2, true, DataFormat.HBaseBinary);
pig.registerQuery("a = load 'hbase://" + TESTTABLE_1 + "' using "
@@ -853,7 +856,7 @@ public class TestHBaseStorage {
Tuple t = it.next();
// the columns for both relations should be merged into one tuple
// left side
- String rowKey = (String) t.get(0);
+ String rowKey = (String) t.get(0);
int col_a = (Integer) t.get(1);
double col_b = (Double) t.get(2);
String col_c = (String) t.get(3);
@@ -875,7 +878,7 @@ public class TestHBaseStorage {
Assert.assertEquals(count, col_a2);
Assert.assertEquals(count + 0.0, col_b2, 1e-6);
Assert.assertEquals("Text_" + count, col_c2);
-
+
count++;
}
Assert.assertEquals(count, TEST_ROW_COUNT);
@@ -883,9 +886,9 @@ public class TestHBaseStorage {
}
/**
- * Test collected group
+ * Test collected group
* not much to test here since keys are unique
- *
+ *
* @throws IOException
*/
@Test
@@ -920,7 +923,7 @@ public class TestHBaseStorage {
int col_a = (Integer) row.get(1);
double col_b = (Double) row.get(2);
String col_c = (String) row.get(3);
-
+
Assert.assertEquals(count, col_a);
Assert.assertEquals(count + 0.0, col_b, 1e-6);
Assert.assertEquals("Text_" + count, col_c);
@@ -1002,6 +1005,7 @@ public class TestHBaseStorage {
pig.getPigContext().getProperties()
.setProperty(MRConfiguration.FILEOUTPUTCOMMITTER_MARKSUCCESSFULJOBS, "false");
+ table.close();
}
/**
@@ -1038,6 +1042,7 @@ public class TestHBaseStorage {
Assert.assertEquals(i + 0.0, col_b, 1e-6);
}
Assert.assertEquals(100, i);
+ table.close();
}
/**
@@ -1074,6 +1079,7 @@ public class TestHBaseStorage {
Assert.assertEquals("Text_" + i, col_c);
}
Assert.assertEquals(100, i);
+ table.close();
}
/**
@@ -1136,6 +1142,7 @@ public class TestHBaseStorage {
Assert.assertEquals(i + 0.0, col_b, 1e-6);
}
Assert.assertEquals(100, i);
+ table.close();
}
/**
@@ -1171,6 +1178,7 @@ public class TestHBaseStorage {
Assert.assertEquals(i + 0.0 + "", col_b);
}
Assert.assertEquals(100, i);
+ table.close();
}
/**
Modified: pig/branches/branch-0.14/test/tez-local-tests
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/test/tez-local-tests?rev=1637725&r1=1637724&r2=1637725&view=diff
==============================================================================
--- pig/branches/branch-0.14/test/tez-local-tests (original)
+++ pig/branches/branch-0.14/test/tez-local-tests Sun Nov 9 19:44:01 2014
@@ -1,4 +1,5 @@
**/TestAccumuloPigCluster.java
**/TestBigTypeSort.java
**/TestCurrentTime.java
+**/TestHBaseStorage.java
**/TestInvokerGenerator.java