You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/11/09 20:44:02 UTC

svn commit: r1637725 - in /pig/branches/branch-0.14: ./ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/tez/ src/org/apache/pig/tools...

Author: rohini
Date: Sun Nov  9 19:44:01 2014
New Revision: 1637725

URL: http://svn.apache.org/r1637725
Log:
PIG-4316: Port TestHBaseStorage to tez local mode (rohini)

Modified:
    pig/branches/branch-0.14/CHANGES.txt
    pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java
    pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java
    pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
    pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
    pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
    pig/branches/branch-0.14/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java
    pig/branches/branch-0.14/test/org/apache/pig/test/TestHBaseStorage.java
    pig/branches/branch-0.14/test/tez-local-tests

Modified: pig/branches/branch-0.14/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/CHANGES.txt?rev=1637725&r1=1637724&r2=1637725&view=diff
==============================================================================
--- pig/branches/branch-0.14/CHANGES.txt (original)
+++ pig/branches/branch-0.14/CHANGES.txt Sun Nov  9 19:44:01 2014
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
  
 IMPROVEMENTS
 
+PIG-4316: Port TestHBaseStorage to tez local mode (rohini)
+
 PIG-4224: Upload Tez payload history string to timeline server (daijy)
 
 PIG-3977: Get TezStats working for Oozie (rohini)

Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java?rev=1637725&r1=1637724&r2=1637725&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java Sun Nov  9 19:44:01 2014
@@ -279,14 +279,18 @@ public abstract class Launcher {
     public class JobControlThreadExceptionHandler implements Thread.UncaughtExceptionHandler {
         @Override
         public void uncaughtException(Thread thread, Throwable throwable) {
-            jobControlExceptionStackTrace = Utils.getStackStraceStr(throwable);
-            try {
-                jobControlException = getExceptionFromString(jobControlExceptionStackTrace);
-            } catch (Exception e) {
-                String errMsg = "Could not resolve error that occured when launching job: "
-                        + jobControlExceptionStackTrace;
-                jobControlException = new RuntimeException(errMsg, throwable);
-            }
+            setJobException(throwable);
+        }
+    }
+
+    protected void setJobException(Throwable throwable) {
+        jobControlExceptionStackTrace = Utils.getStackStraceStr(throwable);
+        try {
+            jobControlException = getExceptionFromString(jobControlExceptionStackTrace);
+        } catch (Exception e) {
+            String errMsg = "Could not resolve error that occured when launching job: "
+                    + jobControlExceptionStackTrace;
+            jobControlException = new RuntimeException(errMsg, throwable);
         }
     }
 

Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java?rev=1637725&r1=1637724&r2=1637725&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java Sun Nov  9 19:44:01 2014
@@ -17,6 +17,8 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
 
+import java.io.IOException;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -26,8 +28,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.impl.util.UriUtil;
 
-import java.io.IOException;
-
 /**
  * Class that computes the size of output for file-based systems.
  */
@@ -43,19 +43,23 @@ public class FileBasedOutputSizeReader i
      */
     @Override
     public boolean supports(POStore sto, Configuration conf) {
-        String storeFuncName = sto.getStoreFunc().getClass().getCanonicalName();
-        // Some store functions do not support file-based output reader (e.g.
-        // HCatStorer), so they should be excluded.
-        String unsupported = conf.get(
-                PigStatsOutputSizeReader.OUTPUT_SIZE_READER_UNSUPPORTED);
-        if (unsupported != null) {
-            for (String s : unsupported.split(",")) {
-                if (s.equalsIgnoreCase(storeFuncName)) {
-                    return false;
+        boolean nullOrSupportedScheme = UriUtil.isHDFSFileOrLocalOrS3N(getLocationUri(sto), conf);
+        if (nullOrSupportedScheme) {
+            // Some store functions that do not have scheme
+            // do not support file-based output reader (e.g.HCatStorer),
+            // so they should be excluded.
+            String unsupported = conf.get(
+                    PigStatsOutputSizeReader.OUTPUT_SIZE_READER_UNSUPPORTED);
+            if (unsupported != null) {
+                String storeFuncName = sto.getStoreFunc().getClass().getCanonicalName();
+                for (String s : unsupported.split(",")) {
+                    if (s.equalsIgnoreCase(storeFuncName)) {
+                        return false;
+                    }
                 }
             }
         }
-        return UriUtil.isHDFSFileOrLocalOrS3N(getLocationUri(sto), conf);
+        return nullOrSupportedScheme;
     }
 
     /**

Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=1637725&r1=1637724&r2=1637725&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java Sun Nov  9 19:44:01 2014
@@ -93,6 +93,7 @@ public class PigInputFormat extends Inpu
         Configuration conf = context.getConfiguration();
         PigContext.setPackageImportList((ArrayList<String>) ObjectSerializer
                 .deserialize(conf.get("udf.import.list")));
+        MapRedUtil.setupUDFContext(conf);
         LoadFunc loadFunc = getLoadFunc(pigSplit.getInputIndex(), conf);
         // Pass loader signature to LoadFunc and to InputFormat through
         // the conf

Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java?rev=1637725&r1=1637724&r2=1637725&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java Sun Nov  9 19:44:01 2014
@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.pig.PigConfiguration;
+import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.tools.pigstats.tez.TezPigScriptStats;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.common.counters.TezCounters;
@@ -153,6 +154,7 @@ public class TezJob implements Runnable 
 
     @Override
     public void run() {
+        UDFContext udfContext = UDFContext.getUDFContext();
         try {
             tezClient = TezSessionManager.getClient(conf, requestAMResources,
                     dag.getCredentials(), tezJobConf);
@@ -182,12 +184,15 @@ public class TezJob implements Runnable 
             }
 
             if (dagStatus.isCompleted()) {
+                // For tez_local mode where PigProcessor destroys all UDFContext
+                UDFContext.setUdfContext(udfContext);
+
                 log.info("DAG Status: " + dagStatus);
                 dagCounters = dagStatus.getDAGCounters();
                 TezSessionManager.freeSession(tezClient);
                 try {
                     pigStats.accumulateStats(this);
-                } catch (IOException e) {
+                } catch (Exception e) {
                     log.warn("Exception while gathering stats", e);
                 }
                 try {

Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1637725&r1=1637724&r2=1637725&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java Sun Nov  9 19:44:01 2014
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -34,6 +35,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.pig.PigConfiguration;
+import org.apache.pig.PigException;
 import org.apache.pig.PigWarning;
 import org.apache.pig.backend.BackendException;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -62,6 +64,7 @@ import org.apache.pig.impl.plan.Compilat
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.LogUtils;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.tools.pigstats.OutputStats;
 import org.apache.pig.tools.pigstats.PigStats;
@@ -90,8 +93,10 @@ public class TezLauncher extends Launche
 
     public TezLauncher() {
         if (namedThreadFactory == null) {
-            namedThreadFactory = new ThreadFactoryBuilder().setNameFormat(
-                    "PigTezLauncher-%d").build();
+            namedThreadFactory = new ThreadFactoryBuilder()
+                .setNameFormat("PigTezLauncher-%d")
+                .setUncaughtExceptionHandler(new JobControlThreadExceptionHandler())
+                .build();
         }
         executor = Executors.newSingleThreadExecutor(namedThreadFactory);
     }
@@ -154,18 +159,15 @@ public class TezLauncher extends Launche
 
                 // Set the thread UDFContext so registered classes are available.
                 final UDFContext udfContext = UDFContext.getUDFContext();
-                Thread task = new Thread(runningJob) {
+                Runnable task = new Runnable() {
                     @Override
                     public void run() {
+                        Thread.currentThread().setContextClassLoader(PigContext.getClassLoader());
                         UDFContext.setUdfContext(udfContext.clone());
-                        super.run();
+                        runningJob.run();
                     }
                 };
 
-                JobControlThreadExceptionHandler jctExceptionHandler = new JobControlThreadExceptionHandler();
-                task.setUncaughtExceptionHandler(jctExceptionHandler);
-                task.setContextClassLoader(PigContext.getClassLoader());
-
                 // Mark the times that the jobs were submitted so it's reflected in job
                 // history props. TODO: Fix this. unused now
                 long scriptSubmittedTimestamp = System.currentTimeMillis();
@@ -193,6 +195,15 @@ public class TezLauncher extends Launche
                     reporter.notifyUpdate();
                     Thread.sleep(1000);
                 }
+                // For tez_local mode where PigProcessor destroys all UDFContext
+                UDFContext.setUdfContext(udfContext);
+                try {
+                    // In case of FutureTask there is no uncaught exception
+                    // Need to do future.get() to get any exception
+                    future.get();
+                } catch (ExecutionException e) {
+                    setJobException(e.getCause());
+                }
             }
             processedDAGs++;
             if (tezPlanContainer.size() == processedDAGs) {
@@ -201,6 +212,7 @@ public class TezLauncher extends Launche
                 tezScriptState.emitProgressUpdatedNotification(
                     ((tezPlanContainer.size() - processedDAGs)/tezPlanContainer.size()) * 100);
             }
+            handleUnCaughtException(pc);
             tezPlanContainer.updatePlan(tezPlan, reporter.notifyFinishedOrFailed());
         }
 
@@ -231,6 +243,28 @@ public class TezLauncher extends Launche
         return tezStats;
     }
 
+    private void handleUnCaughtException(PigContext pc) throws Exception {
+      //check for the uncaught exceptions from TezJob thread
+        //if the job controller fails before launching the jobs then there are
+        //no jobs to check for failure
+        if (jobControlException != null) {
+            if (jobControlException instanceof PigException) {
+                if (jobControlExceptionStackTrace != null) {
+                    LogUtils.writeLog("Error message from Tez Job",
+                            jobControlExceptionStackTrace, pc
+                            .getProperties().getProperty(
+                                    "pig.logfile"), log);
+                }
+                throw jobControlException;
+            } else {
+                int errCode = 2117;
+                String msg = "Unexpected error when launching Tez job.";
+                throw new ExecException(msg, errCode, PigException.BUG,
+                        jobControlException);
+            }
+        }
+    }
+
     private void computeWarningAggregate(Map<String, Map<String, Long>> counterGroups, Map<Enum, Long> aggMap) {
         for (Map<String, Long> counters : counterGroups.values()) {
             for (Enum e : PigWarning.values()) {

Modified: pig/branches/branch-0.14/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java?rev=1637725&r1=1637724&r2=1637725&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java Sun Nov  9 19:44:01 2014
@@ -249,7 +249,11 @@ public class TezVertexStats extends JobS
                     counters.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_WRITTEN) != null) {
                 hdfsBytesWritten = counters.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_WRITTEN);
             } else {
-                hdfsBytesWritten = JobStats.getOutputSize(sto, conf);
+                try {
+                    hdfsBytesWritten = JobStats.getOutputSize(sto, conf);
+                } catch (Exception e) {
+                    LOG.warn("Error while getting the bytes written for the output " + sto.getSFile(), e);
+                }
             }
 
             OutputStats os = new OutputStats(filename, hdfsBytesWritten,

Modified: pig/branches/branch-0.14/test/org/apache/pig/test/TestHBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/test/org/apache/pig/test/TestHBaseStorage.java?rev=1637725&r1=1637724&r2=1637725&view=diff
==============================================================================
--- pig/branches/branch-0.14/test/org/apache/pig/test/TestHBaseStorage.java (original)
+++ pig/branches/branch-0.14/test/org/apache/pig/test/TestHBaseStorage.java Sun Nov  9 19:44:01 2014
@@ -49,6 +49,7 @@ import org.apache.pig.data.Tuple;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
+import org.junit.Assume;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -101,7 +102,7 @@ public class TestHBaseStorage {
 
     @Before
     public void beforeTest() throws Exception {
-        pig = new PigServer(ExecType.LOCAL, conf);
+        pig = new PigServer(Util.getLocalTestMode(), conf);
     }
 
     @After
@@ -125,6 +126,7 @@ public class TestHBaseStorage {
             deletes.add(new Delete(row.getRow()));
         }
         table.delete(deletes);
+        table.close();
     }
 
     /**
@@ -825,7 +827,7 @@ public class TestHBaseStorage {
         Assert.assertEquals(100, index);
         LOG.info("testLoadWithProjection_2 done");
     }
-    
+
     /**
      * Test merge inner join with two tables
      *
@@ -833,6 +835,7 @@ public class TestHBaseStorage {
      */
     @Test
     public void testMergeJoin() throws IOException {
+        Assume.assumeTrue("Skip this test for TEZ. See PIG-4315", pig.getPigContext().getExecType().equals(ExecType.LOCAL));
         prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary);
         prepareTable(TESTTABLE_2, true, DataFormat.HBaseBinary);
         pig.registerQuery("a = load 'hbase://" + TESTTABLE_1 + "' using "
@@ -853,7 +856,7 @@ public class TestHBaseStorage {
             Tuple t = it.next();
             // the columns for both relations should be merged into one tuple
             // left side
-            String rowKey = (String) t.get(0);            
+            String rowKey = (String) t.get(0);
             int col_a = (Integer) t.get(1);
             double col_b = (Double) t.get(2);
             String col_c = (String) t.get(3);
@@ -875,7 +878,7 @@ public class TestHBaseStorage {
             Assert.assertEquals(count, col_a2);
             Assert.assertEquals(count + 0.0, col_b2, 1e-6);
             Assert.assertEquals("Text_" + count, col_c2);
-            
+
             count++;
         }
         Assert.assertEquals(count, TEST_ROW_COUNT);
@@ -883,9 +886,9 @@ public class TestHBaseStorage {
     }
 
     /**
-     * Test collected group 
+     * Test collected group
      * not much to test here since keys are unique
-     * 
+     *
      * @throws IOException
      */
     @Test
@@ -920,7 +923,7 @@ public class TestHBaseStorage {
                 int col_a = (Integer) row.get(1);
                 double col_b = (Double) row.get(2);
                 String col_c = (String) row.get(3);
-                
+
                 Assert.assertEquals(count, col_a);
                 Assert.assertEquals(count + 0.0, col_b, 1e-6);
                 Assert.assertEquals("Text_" + count, col_c);
@@ -1002,6 +1005,7 @@ public class TestHBaseStorage {
 
         pig.getPigContext().getProperties()
                 .setProperty(MRConfiguration.FILEOUTPUTCOMMITTER_MARKSUCCESSFULJOBS, "false");
+        table.close();
     }
 
     /**
@@ -1038,6 +1042,7 @@ public class TestHBaseStorage {
             Assert.assertEquals(i + 0.0, col_b, 1e-6);
         }
         Assert.assertEquals(100, i);
+        table.close();
     }
 
     /**
@@ -1074,6 +1079,7 @@ public class TestHBaseStorage {
             Assert.assertEquals("Text_" + i, col_c);
         }
         Assert.assertEquals(100, i);
+        table.close();
     }
 
     /**
@@ -1136,6 +1142,7 @@ public class TestHBaseStorage {
             Assert.assertEquals(i + 0.0, col_b, 1e-6);
         }
         Assert.assertEquals(100, i);
+        table.close();
     }
 
     /**
@@ -1171,6 +1178,7 @@ public class TestHBaseStorage {
             Assert.assertEquals(i + 0.0 + "", col_b);
         }
         Assert.assertEquals(100, i);
+        table.close();
     }
 
     /**

Modified: pig/branches/branch-0.14/test/tez-local-tests
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/test/tez-local-tests?rev=1637725&r1=1637724&r2=1637725&view=diff
==============================================================================
--- pig/branches/branch-0.14/test/tez-local-tests (original)
+++ pig/branches/branch-0.14/test/tez-local-tests Sun Nov  9 19:44:01 2014
@@ -1,4 +1,5 @@
 **/TestAccumuloPigCluster.java
 **/TestBigTypeSort.java
 **/TestCurrentTime.java
+**/TestHBaseStorage.java
 **/TestInvokerGenerator.java