You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2014/11/27 13:50:02 UTC

svn commit: r1642132 [6/14] - in /pig/branches/spark: ./ bin/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/datetime/convert/ contrib/piggybank/java/sr...

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java Thu Nov 27 12:49:54 2014
@@ -23,10 +23,11 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ThreadFactory;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -34,6 +35,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.pig.PigConfiguration;
+import org.apache.pig.PigException;
 import org.apache.pig.PigWarning;
 import org.apache.pig.backend.BackendException;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -41,40 +43,64 @@ import org.apache.pig.backend.hadoop.dat
 import org.apache.pig.backend.hadoop.executionengine.Launcher;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import org.apache.pig.backend.hadoop.executionengine.tez.optimizers.LoaderProcessor;
-import org.apache.pig.backend.hadoop.executionengine.tez.optimizers.NoopFilterRemover;
-import org.apache.pig.backend.hadoop.executionengine.tez.optimizers.ParallelismSetter;
-import org.apache.pig.backend.hadoop.executionengine.tez.optimizers.UnionOptimizer;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezCompiler;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPOPackageAnnotator;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainer;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerNode;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerPrinter;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.NativeTezOper;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.AccumulatorOptimizer;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.CombinerOptimizer;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.LoaderProcessor;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.MultiQueryOptimizerTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.NoopFilterRemover;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.ParallelismSetter;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.SecondaryKeyOptimizerTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.UnionOptimizer;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.plan.CompilationMessageCollector;
 import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.LogUtils;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.tools.pigstats.OutputStats;
 import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.tez.TezPigScriptStats;
 import org.apache.pig.tools.pigstats.tez.TezScriptState;
-import org.apache.pig.tools.pigstats.tez.TezStats;
-import org.apache.pig.tools.pigstats.tez.TezTaskStats;
-import org.apache.tez.common.TezUtils;
+import org.apache.pig.tools.pigstats.tez.TezVertexStats;
+import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 /**
  * Main class that launches pig for Tez
  */
 public class TezLauncher extends Launcher {
     private static final Log log = LogFactory.getLog(TezLauncher.class);
-    private ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1);
+    private static ThreadFactory namedThreadFactory;
+    private ExecutorService executor;
     private boolean aggregateWarning = false;
     private TezScriptState tezScriptState;
-    private TezStats tezStats;
+    private TezPigScriptStats tezStats;
     private TezJob runningJob;
 
+    public TezLauncher() {
+        if (namedThreadFactory == null) {
+            namedThreadFactory = new ThreadFactoryBuilder()
+                .setNameFormat("PigTezLauncher-%d")
+                .setUncaughtExceptionHandler(new JobControlThreadExceptionHandler())
+                .build();
+        }
+        executor = Executors.newSingleThreadExecutor(namedThreadFactory);
+    }
+
     @Override
     public PigStats launchPig(PhysicalPlan php, String grpName, PigContext pc) throws Exception {
         if (pc.getExecType().isLocal()) {
@@ -83,7 +109,7 @@ public class TezLauncher extends Launche
             pc.getProperties().setProperty("tez.ignore.lib.uris", "true");
         }
         Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties(), true);
-        if (pc.defaultParallel == -1 && !conf.getBoolean(PigConfiguration.TEZ_AUTO_PARALLELISM, true)) {
+        if (pc.defaultParallel == -1 && !conf.getBoolean(PigConfiguration.PIG_TEZ_AUTO_PARALLELISM, true)) {
             pc.defaultParallel = 1;
         }
         aggregateWarning = conf.getBoolean("aggregate.warning", false);
@@ -98,60 +124,52 @@ public class TezLauncher extends Launche
         List<TezOperPlan> processedPlans = new ArrayList<TezOperPlan>();
 
         tezScriptState = TezScriptState.get();
-        tezStats = new TezStats(pc);
+        tezStats = new TezPigScriptStats(pc);
         PigStats.start(tezStats);
 
         conf.set(TezConfiguration.TEZ_USE_CLUSTER_HADOOP_LIBS, "true");
         TezJobCompiler jc = new TezJobCompiler(pc, conf);
         TezPlanContainer tezPlanContainer = compile(php, pc);
 
-        int defaultTimeToSleep = pc.getExecType().isLocal() ? 100 : 1000;
-        int timeToSleep = conf.getInt("pig.jobcontrol.sleep", defaultTimeToSleep);
-        if (timeToSleep != defaultTimeToSleep) {
-            log.info("overriding default JobControl sleep (" +
-                    defaultTimeToSleep + ") to " + timeToSleep);
-        }
+        tezStats.initialize(tezPlanContainer);
+        tezScriptState.emitInitialPlanNotification(tezPlanContainer);
+        tezScriptState.emitLaunchStartedNotification(tezPlanContainer.size()); //number of DAGs to Launch
 
+        TezPlanContainerNode tezPlanContainerNode;
         TezOperPlan tezPlan;
-        while ((tezPlan=tezPlanContainer.getNextPlan(processedPlans)) != null) {
-            optimize(tezPlan, pc);
+        int processedDAGs = 0;
+        while ((tezPlanContainerNode = tezPlanContainer.getNextPlan(processedPlans)) != null) {
+            tezPlan = tezPlanContainerNode.getTezOperPlan();
+            processLoadAndParallelism(tezPlan, pc);
             processedPlans.add(tezPlan);
-            ProgressReporter reporter = new ProgressReporter();
-            tezStats.initialize(tezPlan);
+            ProgressReporter reporter = new ProgressReporter(tezPlanContainer.size(), processedDAGs);
             if (tezPlan.size()==1 && tezPlan.getRoots().get(0) instanceof NativeTezOper) {
                 // Native Tez Plan
                 NativeTezOper nativeOper = (NativeTezOper)tezPlan.getRoots().get(0);
-                tezScriptState.emitInitialPlanNotification(tezPlan);
-                tezScriptState.emitLaunchStartedNotification(tezPlan.size());
                 tezScriptState.emitJobsSubmittedNotification(1);
-                nativeOper.runJob();
+                nativeOper.runJob(tezPlanContainerNode.getOperatorKey().toString());
             } else {
                 TezPOPackageAnnotator pkgAnnotator = new TezPOPackageAnnotator(tezPlan);
                 pkgAnnotator.visit();
 
-                runningJob = jc.compile(tezPlan, grpName, tezPlanContainer);
-    
-                tezScriptState.emitInitialPlanNotification(tezPlan);
-                tezScriptState.emitLaunchStartedNotification(tezPlan.size());
-    
+                runningJob = jc.compile(tezPlanContainerNode, tezPlanContainer);
+                //TODO: Exclude vertex groups from numVerticesToLaunch ??
+                tezScriptState.dagLaunchNotification(runningJob.getName(), tezPlan, tezPlan.size());
+                runningJob.setPigStats(tezStats);
+
                 // Set the thread UDFContext so registered classes are available.
                 final UDFContext udfContext = UDFContext.getUDFContext();
-                Thread task = new Thread(runningJob) {
+                Runnable task = new Runnable() {
                     @Override
                     public void run() {
+                        Thread.currentThread().setContextClassLoader(PigContext.getClassLoader());
                         UDFContext.setUdfContext(udfContext.clone());
-                        super.run();
+                        runningJob.run();
                     }
                 };
-    
-                JobControlThreadExceptionHandler jctExceptionHandler = new JobControlThreadExceptionHandler();
-                task.setUncaughtExceptionHandler(jctExceptionHandler);
-                task.setContextClassLoader(PigContext.getClassLoader());
-    
-                tezStats.setTezJob(runningJob);
-    
+
                 // Mark the times that the jobs were submitted so it's reflected in job
-                // history props
+                // history props. TODO: Fix this. unused now
                 long scriptSubmittedTimestamp = System.currentTimeMillis();
                 // Job.getConfiguration returns the shared configuration object
                 Configuration jobConf = runningJob.getConfiguration();
@@ -159,20 +177,42 @@ public class TezLauncher extends Launche
                         Long.toString(scriptSubmittedTimestamp));
                 jobConf.set("pig.job.submitted.timestamp",
                         Long.toString(System.currentTimeMillis()));
-    
-                Future<?> future = executor.schedule(task, timeToSleep, TimeUnit.MILLISECONDS);
-                
+
+                Future<?> future = executor.submit(task);
                 tezScriptState.emitJobsSubmittedNotification(1);
-                reporter.notifyStarted();
-    
+
+                boolean jobStarted = false;
+
                 while (!future.isDone()) {
+                    if (!jobStarted && runningJob.getApplicationId() != null) {
+                        jobStarted = true;
+                        String appId = runningJob.getApplicationId().toString();
+                        //For Oozie Pig action job id matching compatibility with MR mode
+                        log.info("HadoopJobId: "+ appId.replace("application", "job"));
+                        tezScriptState.emitJobStartedNotification(appId);
+                        tezScriptState.dagStartedNotification(runningJob.getName(), appId);
+                    }
                     reporter.notifyUpdate();
                     Thread.sleep(1000);
                 }
-    
-                tezStats.accumulateStats(runningJob);
+                // For tez_local mode where PigProcessor destroys all UDFContext
+                UDFContext.setUdfContext(udfContext);
+                try {
+                    // In case of FutureTask there is no uncaught exception
+                    // Need to do future.get() to get any exception
+                    future.get();
+                } catch (ExecutionException e) {
+                    setJobException(e.getCause());
+                }
             }
-            tezScriptState.emitProgressUpdatedNotification(100);
+            processedDAGs++;
+            if (tezPlanContainer.size() == processedDAGs) {
+                tezScriptState.emitProgressUpdatedNotification(100);
+            } else {
+                tezScriptState.emitProgressUpdatedNotification(
+                    ((tezPlanContainer.size() - processedDAGs)/tezPlanContainer.size()) * 100);
+            }
+            handleUnCaughtException(pc);
             tezPlanContainer.updatePlan(tezPlan, reporter.notifyFinishedOrFailed());
         }
 
@@ -203,6 +243,28 @@ public class TezLauncher extends Launche
         return tezStats;
     }
 
+    private void handleUnCaughtException(PigContext pc) throws Exception {
+      //check for the uncaught exceptions from TezJob thread
+        //if the job controller fails before launching the jobs then there are
+        //no jobs to check for failure
+        if (jobControlException != null) {
+            if (jobControlException instanceof PigException) {
+                if (jobControlExceptionStackTrace != null) {
+                    LogUtils.writeLog("Error message from Tez Job",
+                            jobControlExceptionStackTrace, pc
+                            .getProperties().getProperty(
+                                    "pig.logfile"), log);
+                }
+                throw jobControlException;
+            } else {
+                int errCode = 2117;
+                String msg = "Unexpected error when launching Tez job.";
+                throw new ExecException(msg, errCode, PigException.BUG,
+                        jobControlException);
+            }
+        }
+    }
+
     private void computeWarningAggregate(Map<String, Map<String, Long>> counterGroups, Map<Enum, Long> aggMap) {
         for (Map<String, Long> counters : counterGroups.values()) {
             for (Enum e : PigWarning.values()) {
@@ -223,53 +285,57 @@ public class TezLauncher extends Launche
     }
 
     private class ProgressReporter {
+        private int totalDAGs;
+        private int processedDAGS;
         private int count = 0;
         private int prevProgress = 0;
 
-        public void notifyStarted() throws IOException {
-            for (Vertex v : runningJob.getDAG().getVertices()) {
-                TezTaskStats tts = tezStats.getVertexStats(v.getName());
-                UserPayload payload = v.getProcessorDescriptor().getUserPayload();
-                Configuration conf = TezUtils.createConfFromUserPayload(payload);
-                tts.setConf(conf);
-                tts.setId(v.getName());
-                tezScriptState.emitJobStartedNotification(v.getName());
-            }
+        public ProgressReporter(int totalDAGs, int processedDAGs) {
+            this.totalDAGs = totalDAGs;
+            this.processedDAGS = processedDAGs;
         }
 
         public void notifyUpdate() {
             DAGStatus dagStatus = runningJob.getDAGStatus();
             if (dagStatus != null && dagStatus.getState() == DAGStatus.State.RUNNING) {
                 // Emit notification when the job has progressed more than 1%,
-                // or every 10 second
+                // or every 20 seconds
                 int currProgress = Math.round(runningJob.getDAGProgress() * 100f);
                 if (currProgress - prevProgress >= 1 || count % 100 == 0) {
-                    tezScriptState.emitProgressUpdatedNotification(currProgress);
+                    tezScriptState.dagProgressNotification(runningJob.getName(), -1, currProgress);
+                    tezScriptState.emitProgressUpdatedNotification((currProgress + (100 * processedDAGS))/totalDAGs);
                     prevProgress = currProgress;
                 }
                 count++;
             }
+            // TODO: Add new vertex tracking methods to PigTezProgressNotificationListener
+            // and emit notifications for individual vertex start, progress and completion
         }
 
         public boolean notifyFinishedOrFailed() {
             DAGStatus dagStatus = runningJob.getDAGStatus();
+            if (dagStatus == null) {
+                return false;
+            }
             if (dagStatus.getState() == DAGStatus.State.SUCCEEDED) {
                 Map<Enum, Long> warningAggMap = new HashMap<Enum, Long>();
-                for (Vertex v : runningJob.getDAG().getVertices()) {
-                    TezTaskStats tts = tezStats.getVertexStats(v.getName());
-                    tezScriptState.emitjobFinishedNotification(tts);
-                    Map<String, Map<String, Long>> counterGroups = runningJob.getVertexCounters(v.getName());
-                    computeWarningAggregate(counterGroups, warningAggMap);
+                DAG dag = runningJob.getDAG();
+                for (Vertex v : dag.getVertices()) {
+                    TezVertexStats tts = tezStats.getVertexStats(dag.getName(), v.getName());
+                    if (tts == null) {
+                        continue; //vertex groups
+                    }
+                    Map<String, Map<String, Long>> counterGroups = tts.getCounters();
+                    if (counterGroups == null) {
+                        log.warn("Counters are not available for vertex " + v.getName() + ". Not computing warning aggregates.");
+                    } else {
+                        computeWarningAggregate(counterGroups, warningAggMap);
+                    }
                 }
                 if (aggregateWarning) {
                     CompilationMessageCollector.logAggregate(warningAggMap, MessageType.Warning, log);
                 }
                 return true;
-            } else if (dagStatus.getState() == DAGStatus.State.FAILED) {
-                for (Vertex v : ((TezJob)runningJob).getDAG().getVertices()) {
-                    TezTaskStats tts = tezStats.getVertexStats(v.getName());
-                    tezScriptState.emitJobFailedNotification(tts);
-                }
             }
             return false;
         }
@@ -281,10 +347,6 @@ public class TezLauncher extends Launche
             VisitorException, IOException {
         log.debug("Entering TezLauncher.explain");
         TezPlanContainer tezPlanContainer = compile(php, pc);
-        for (Map.Entry<OperatorKey,TezPlanContainerNode> entry : tezPlanContainer.getKeys().entrySet()) {
-            TezOperPlan tezPlan = entry.getValue().getNode();
-            optimize(tezPlan, pc);
-        }
 
         if (format.equals("text")) {
             TezPlanContainerPrinter printer = new TezPlanContainerPrinter(ps, tezPlanContainer);
@@ -296,7 +358,20 @@ public class TezLauncher extends Launche
         }
     }
 
-    public static void optimize(TezOperPlan tezPlan, PigContext pc) throws VisitorException {
+    public TezPlanContainer compile(PhysicalPlan php, PigContext pc)
+            throws PlanException, IOException, VisitorException {
+        TezCompiler comp = new TezCompiler(php, pc);
+        comp.compile();
+        TezPlanContainer planContainer = comp.getPlanContainer();
+        for (Map.Entry<OperatorKey, TezPlanContainerNode> entry : planContainer
+                .getKeys().entrySet()) {
+            TezOperPlan tezPlan = entry.getValue().getTezOperPlan();
+            optimize(tezPlan, pc);
+        }
+        return planContainer;
+    }
+
+    private void optimize(TezOperPlan tezPlan, PigContext pc) throws VisitorException {
         Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
         boolean aggregateWarning = conf.getBoolean("aggregate.warning", false);
 
@@ -304,10 +379,10 @@ public class TezLauncher extends Launche
         filter.visit();
 
         // Run CombinerOptimizer on Tez plan
-        boolean nocombiner = conf.getBoolean(PigConfiguration.PROP_NO_COMBINER, false);
+        boolean nocombiner = conf.getBoolean(PigConfiguration.PIG_EXEC_NO_COMBINER, false);
         if (!pc.inIllustrator && !nocombiner)  {
             boolean doMapAgg = Boolean.parseBoolean(pc.getProperties().getProperty(
-                    PigConfiguration.PROP_EXEC_MAP_PARTAGG, "false"));
+                    PigConfiguration.PIG_EXEC_MAP_PARTAGG, "false"));
             CombinerOptimizer co = new CombinerOptimizer(tezPlan, doMapAgg);
             co.visit();
             co.getMessageCollector().logMessages(MessageType.Warning, aggregateWarning, log);
@@ -321,7 +396,7 @@ public class TezLauncher extends Launche
             skOptimizer.visit();
         }
 
-        boolean isMultiQuery = conf.getBoolean(PigConfiguration.OPT_MULTIQUERY, true);
+        boolean isMultiQuery = conf.getBoolean(PigConfiguration.PIG_OPT_MULTIQUERY, true);
         if (isMultiQuery) {
             // reduces the number of TezOpers in the Tez plan generated
             // by multi-query (multi-store) script.
@@ -330,35 +405,32 @@ public class TezLauncher extends Launche
         }
 
         // Run AccumulatorOptimizer on Tez plan
-        boolean isAccum = conf.getBoolean(PigConfiguration.OPT_ACCUMULATOR, true);
+        boolean isAccum = conf.getBoolean(PigConfiguration.PIG_OPT_ACCUMULATOR, true);
         if (isAccum) {
             AccumulatorOptimizer accum = new AccumulatorOptimizer(tezPlan);
             accum.visit();
         }
 
         // Use VertexGroup in Tez
-        boolean isUnionOpt = conf.getBoolean(PigConfiguration.TEZ_OPT_UNION, true);
+        boolean isUnionOpt = conf.getBoolean(PigConfiguration.PIG_TEZ_OPT_UNION, true);
         if (isUnionOpt) {
             UnionOptimizer uo = new UnionOptimizer(tezPlan);
             uo.visit();
         }
 
+    }
+
+    public static void processLoadAndParallelism(TezOperPlan tezPlan, PigContext pc) throws VisitorException {
         if (!pc.inExplain && !pc.inDumpSchema) {
             LoaderProcessor loaderStorer = new LoaderProcessor(tezPlan, pc);
             loaderStorer.visit();
 
             ParallelismSetter parallelismSetter = new ParallelismSetter(tezPlan, pc);
             parallelismSetter.visit();
+            tezPlan.setEstimatedParallelism(parallelismSetter.getEstimatedTotalParallelism());
         }
     }
 
-    public TezPlanContainer compile(PhysicalPlan php, PigContext pc)
-            throws PlanException, IOException, VisitorException {
-        TezCompiler comp = new TezCompiler(php, pc);
-        comp.compile();
-        return comp.getPlanContainer();
-    }
-
     @Override
     public void kill() throws BackendException {
         if (runningJob != null) {
@@ -368,9 +440,7 @@ public class TezLauncher extends Launche
                 throw new BackendException(e);
             }
         }
-        if (executor != null) {
-            executor.shutdownNow();
-        }
+        destroy();
     }
 
     @Override
@@ -385,4 +455,17 @@ public class TezLauncher extends Launche
             log.info("Cannot find job: " + jobID);
         }
     }
+
+    @Override
+    public void destroy() {
+        try {
+            if (executor != null && !executor.isShutdown()) {
+                log.info("Shutting down thread pool");
+                executor.shutdownNow();
+            }
+        } catch (Exception e) {
+            log.warn("Error shutting down threadpool");
+        }
+    }
+
 }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java Thu Nov 27 12:49:54 2014
@@ -33,7 +33,6 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.pig.backend.hadoop.datastorage.HPath;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 
@@ -43,6 +42,7 @@ public class TezResourceManager {
     private Path stagingDir;
     private FileSystem remoteFs;
     private Configuration conf;
+    private PigContext pigContext;
     public Map<String, Path> resources = new HashMap<String, Path>();
 
     static public TezResourceManager getInstance() {
@@ -54,9 +54,10 @@ public class TezResourceManager {
 
     public void init(PigContext pigContext, Configuration conf) throws IOException {
         if (!inited) {
-            this.stagingDir = ((HPath)FileLocalizer.getTemporaryResourcePath(pigContext)).getPath();
+            this.stagingDir = FileLocalizer.getTemporaryResourcePath(pigContext);
             this.remoteFs = FileSystem.get(conf);
             this.conf = conf;
+            this.pigContext = pigContext;
             this.inited = true;
         }
     }
@@ -77,7 +78,7 @@ public class TezResourceManager {
             }
 
             // Ship the local resource to the staging directory on the remote FS
-            if (uri.toString().startsWith("file:")) {
+            if (!pigContext.getExecType().isLocal() && uri.toString().startsWith("file:")) {
                 Path remoteFsPath = remoteFs.makeQualified(new Path(stagingDir, resourceName));
                 remoteFs.copyFromLocalFile(resourcePath, remoteFsPath);
                 remoteFs.setReplication(remoteFsPath, (short)conf.getInt(Job.SUBMIT_REPLICATION, 3));

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java Thu Nov 27 12:49:54 2014
@@ -29,8 +29,11 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezJob.TezJobConfig;
 import org.apache.pig.backend.hadoop.executionengine.tez.util.MRToTezHelper;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.util.Utils;
+import org.apache.pig.tools.pigstats.tez.TezScriptState;
 import org.apache.tez.client.TezAppMasterStatus;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.dag.api.TezConfiguration;
@@ -80,9 +83,13 @@ public class TezSessionManager {
     private static List<SessionInfo> sessionPool = new ArrayList<SessionInfo>();
 
     private static SessionInfo createSession(Configuration conf,
-            Map<String, LocalResource> requestedAMResources, Credentials creds)
-            throws TezException, IOException, InterruptedException {
+            Map<String, LocalResource> requestedAMResources, Credentials creds,
+            TezJobConfig tezJobConf) throws TezException, IOException,
+            InterruptedException {
         TezConfiguration amConf = MRToTezHelper.getDAGAMConfFromMRConf(conf);
+        TezScriptState ss = TezScriptState.get();
+        ss.addDAGSettingsToConf(amConf);
+        adjustAMConfig(amConf, tezJobConf);
         String jobName = conf.get(PigContext.JOB_NAME, "pig");
         TezClient tezClient = TezClient.create(jobName, amConf, true, requestedAMResources, creds);
         tezClient.start();
@@ -94,6 +101,56 @@ public class TezSessionManager {
         return new SessionInfo(tezClient, requestedAMResources);
     }
 
+    private static void adjustAMConfig(TezConfiguration amConf, TezJobConfig tezJobConf) {
+        int requiredAMMaxHeap = -1;
+        int requiredAMResourceMB = -1;
+        int configuredAMMaxHeap = Utils.extractHeapSizeInMB(amConf.get(
+                TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS,
+                TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS_DEFAULT));
+        int configuredAMResourceMB = amConf.getInt(
+                TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB,
+                TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB_DEFAULT);
+
+        if (tezJobConf.getEstimatedTotalParallelism() > 0) {
+
+            int minAMMaxHeap = 3584;
+            int minAMResourceMB = 4096;
+
+            // Rough estimation. For 5K tasks 1G Xmx and 1.5G resource.mb
+            // Increment by 512 mb for every additional 5K tasks.
+            for (int taskCount = 30000; taskCount >= 5000; taskCount-=5000) {
+                if (tezJobConf.getEstimatedTotalParallelism() > taskCount) {
+                    requiredAMMaxHeap = minAMMaxHeap;
+                    requiredAMResourceMB = minAMResourceMB;
+                    break;
+                }
+                minAMMaxHeap = minAMMaxHeap - 512;
+                minAMResourceMB = minAMResourceMB - 512;
+            }
+
+            if (requiredAMResourceMB > -1 && configuredAMResourceMB < requiredAMResourceMB) {
+                amConf.setInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, requiredAMResourceMB);
+                log.info("Increasing "
+                        + TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB + " from "
+                        + configuredAMResourceMB + " to "
+                        + requiredAMResourceMB
+                        + " as the number of total estimated tasks is "
+                        + tezJobConf.getEstimatedTotalParallelism());
+
+                if (requiredAMMaxHeap > -1 && configuredAMMaxHeap < requiredAMMaxHeap) {
+                    amConf.set(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS,
+                            amConf.get(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS)
+                                    + " -Xmx" + requiredAMMaxHeap + "M");
+                    log.info("Increasing Tez AM Heap Size from "
+                            + configuredAMMaxHeap + "M to "
+                            + requiredAMMaxHeap
+                            + "M as the number of total estimated tasks is "
+                            + tezJobConf.getEstimatedTotalParallelism());
+                }
+            }
+        }
+    }
+
     private static boolean validateSessionResources(SessionInfo currentSession,
             Map<String, LocalResource> requestedAMResources)
             throws TezException, IOException {
@@ -106,7 +163,7 @@ public class TezSessionManager {
     }
 
     static TezClient getClient(Configuration conf, Map<String, LocalResource> requestedAMResources,
-            Credentials creds) throws TezException, IOException, InterruptedException {
+            Credentials creds, TezJobConfig tezJobConf) throws TezException, IOException, InterruptedException {
         List<SessionInfo> sessionsToRemove = new ArrayList<SessionInfo>();
         SessionInfo newSession = null;
         sessionPoolLock.readLock().lock();
@@ -135,11 +192,12 @@ public class TezSessionManager {
         // We cannot find available AM, create new one
         // Create session outside of locks so that getClient/freeSession is not
         // blocked for parallel embedded pig runs
-        newSession = createSession(conf, requestedAMResources, creds);
+        newSession = createSession(conf, requestedAMResources, creds, tezJobConf);
         newSession.inUse = true;
         sessionPoolLock.writeLock().lock();
         try {
             if (shutdown == true) {
+                log.info("Shutting down Tez session " + newSession.session);
                 newSession.session.stop();
                 throw new IOException("TezSessionManager is shut down");
             }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java Thu Nov 27 12:49:54 2014
@@ -26,6 +26,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
@@ -35,7 +36,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
-import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 
 @InterfaceAudience.Private
 public class MRToTezHelper {
@@ -117,6 +117,12 @@ public class MRToTezHelper {
         dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_RESOURCE_CPU_VCORES, ""
                 + amCores);
 
+        dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_VIEW_ACLS,
+                tezConf.get(MRJobConfig.JOB_ACL_VIEW_JOB, MRJobConfig.DEFAULT_JOB_ACL_VIEW_JOB));
+
+        dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_MODIFY_ACLS,
+                tezConf.get(MRJobConfig.JOB_ACL_MODIFY_JOB, MRJobConfig.DEFAULT_JOB_ACL_MODIFY_JOB));
+
         dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS, ""
                 + dagAMConf.getInt(MRJobConfig.MR_AM_MAX_ATTEMPTS,
                         MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS));

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java Thu Nov 27 12:49:54 2014
@@ -1,7 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.pig.backend.hadoop.executionengine.tez.util;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.pig.PigException;
@@ -12,19 +30,21 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import org.apache.pig.backend.hadoop.executionengine.tez.POLocalRearrangeTez;
-import org.apache.pig.backend.hadoop.executionengine.tez.POStoreTez;
-import org.apache.pig.backend.hadoop.executionengine.tez.POValueOutputTez;
-import org.apache.pig.backend.hadoop.executionengine.tez.RoundRobinPartitioner;
-import org.apache.pig.backend.hadoop.executionengine.tez.TezEdgeDescriptor;
-import org.apache.pig.backend.hadoop.executionengine.tez.TezOperPlan;
-import org.apache.pig.backend.hadoop.executionengine.tez.TezOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezEdgeDescriptor;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POLocalRearrangeTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.runtime.RoundRobinPartitioner;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.runtime.library.input.UnorderedKVInput;
 import org.apache.tez.runtime.library.output.UnorderedKVOutput;
@@ -172,4 +192,19 @@ public class TezCompilerUtil {
         edge.setIntermediateOutputValueClass(TUPLE_CLASS);
     }
 
+    /**
+     * Returns true if there are no loads or stores in a TezOperator.
+     * To be called only after LoaderProcessor is called
+     */
+    static public boolean isIntermediateReducer(TezOperator tezOper) throws VisitorException {
+        boolean intermediateReducer = false;
+        LinkedList<POStore> stores = PlanHelper.getPhysicalOperators(tezOper.plan, POStore.class);
+        // Not map and not final reducer
+        if (stores.size() <= 0 &&
+                (tezOper.getLoaderInfo().getLoads() == null || tezOper.getLoaderInfo().getLoads().size() <= 0)) {
+            intermediateReducer = true;
+        }
+        return intermediateReducer;
+    }
+
 }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java Thu Nov 27 12:49:54 2014
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.pig.backend.hadoop.executionengine.util;
 
 import java.util.List;
@@ -5,6 +22,8 @@ import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.Accumulator;
+import org.apache.pig.PigConfiguration;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.BinaryExpressionOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
@@ -27,6 +46,17 @@ import org.apache.pig.impl.PigContext;
 public class AccumulatorOptimizerUtil {
     private static final Log LOG = LogFactory.getLog(AccumulatorOptimizerUtil.class);
 
+    public static int getAccumulativeBatchSize() {
+        int batchSize = 20000;
+        if (PigMapReduce.sJobConfInternal.get() != null) {
+            String size = PigMapReduce.sJobConfInternal.get().get(PigConfiguration.PIG_ACCUMULATIVE_BATCHSIZE);
+            if (size != null) {
+                batchSize = Integer.parseInt(size);
+            }
+        }
+        return batchSize;
+    }
+
     public static void addAccumulator(PhysicalPlan plan) {
         // See if this is a map-reduce job
         List<PhysicalOperator> pos = plan.getRoots();

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java Thu Nov 27 12:49:54 2014
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.pig.backend.hadoop.executionengine.util;
 
 import java.util.ArrayList;

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java Thu Nov 27 12:49:54 2014
@@ -293,11 +293,25 @@ public class MapRedUtil {
         }
     };
 
+    public static long getPathLength(FileSystem fs, FileStatus status)
+            throws IOException{
+        return getPathLength(fs, status, Long.MAX_VALUE);
+    }
+
     /**
-     * Returns the total number of bytes for this file,
-     * or if a directory all files in the directory.
+     * Returns the total number of bytes for this file, or if a directory all
+     * files in the directory.
+     * 
+     * @param fs FileSystem
+     * @param status FileStatus
+     * @param max Maximum value of total length that will trigger exit. Many
+     * times we're only interested whether the total length of files is greater
+     * than X or not. In such case, we can exit the function early as soon as
+     * the max is reached.
+     * @return
+     * @throws IOException
      */
-    public static long getPathLength(FileSystem fs, FileStatus status)
+    public static long getPathLength(FileSystem fs, FileStatus status, long max)
             throws IOException {
         if (!status.isDir()) {
             return status.getLen();
@@ -306,7 +320,8 @@ public class MapRedUtil {
                     status.getPath(), hiddenFileFilter);
             long size = 0;
             for (FileStatus child : children) {
-                size += getPathLength(fs, child);
+                size += getPathLength(fs, child, max);
+                if (size > max) return size;
             }
             return size;
         }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/ParallelConstantVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/ParallelConstantVisitor.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/ParallelConstantVisitor.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/ParallelConstantVisitor.java Thu Nov 27 12:49:54 2014
@@ -38,18 +38,16 @@ public class ParallelConstantVisitor ext
 
     @Override
     public void visitConstant(ConstantExpression cnst) throws VisitorException {
-        if (cnst.getRequestedParallelism() == -1) {
-            Object obj = cnst.getValue();
-            if (obj instanceof Integer) {
-                if (replaced) {
-                    // sample job should have only one ConstantExpression
-                    throw new VisitorException("Invalid reduce plan: more " +
-                            "than one ConstantExpression found in sampling job");
-                }
-                cnst.setValue(rp);
-                cnst.setRequestedParallelism(rp);
-                replaced = true;
+        Object obj = cnst.getValue();
+        if (obj instanceof Integer) {
+            if (replaced) {
+                // sample job should have only one ConstantExpression
+                throw new VisitorException("Invalid reduce plan: more " +
+                        "than one ConstantExpression found in sampling job");
             }
+            cnst.setValue(rp);
+            cnst.setRequestedParallelism(rp);
+            replaced = true;
         }
     }
 }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseBinaryConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseBinaryConverter.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseBinaryConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseBinaryConverter.java Thu Nov 27 12:49:54 2014
@@ -93,7 +93,7 @@ public class HBaseBinaryConverter implem
 
     @Override
     public Map<String, Object> bytesToMap(byte[] b, ResourceFieldSchema fieldSchema) throws IOException {
-        return bytesToMap(b, fieldSchema);
+        throw new ExecException("Can't generate a Map from byte[]");
     }
 
     /**

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Thu Nov 27 12:49:54 2014
@@ -24,6 +24,8 @@ import java.lang.reflect.Method;
 import java.lang.reflect.UndeclaredThrowableException;
 import java.math.BigDecimal;
 import java.math.BigInteger;
+import java.net.MalformedURLException;
+import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -74,6 +76,7 @@ import org.apache.hadoop.mapreduce.Outpu
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.pig.CollectableLoadFunc;
 import org.apache.pig.LoadCaster;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.LoadPushDown;
@@ -82,8 +85,10 @@ import org.apache.pig.OrderedLoadFunc;
 import org.apache.pig.ResourceSchema;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.StoreFuncInterface;
+import org.apache.pig.StoreResources;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.backend.hadoop.hbase.HBaseTableInputFormat.HBaseTableIFBuilder;
+import org.apache.pig.builtin.FuncUtils;
 import org.apache.pig.builtin.Utf8StorageConverter;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
@@ -135,7 +140,8 @@ import com.google.common.collect.Lists;
  * <code>buddies</code> column family in the <code>SampleTableCopy</code> table.
  *
  */
-public class HBaseStorage extends LoadFunc implements StoreFuncInterface, LoadPushDown, OrderedLoadFunc {
+public class HBaseStorage extends LoadFunc implements StoreFuncInterface, LoadPushDown, OrderedLoadFunc, StoreResources,
+        CollectableLoadFunc {
 
     private static final Log LOG = LogFactory.getLog(HBaseStorage.class);
 
@@ -317,7 +323,7 @@ public class HBaseStorage extends LoadFu
 			if ("true".equalsIgnoreCase(value) || "".equalsIgnoreCase(value) || value == null) {//the empty string and null check is for backward compat.
 				noWAL_ = true;
 			}
-		}        
+		}
 
         if (configuredOptions_.hasOption("minTimestamp")){
             minTimestamp_ = Long.parseLong(configuredOptions_.getOptionValue("minTimestamp"));
@@ -719,7 +725,6 @@ public class HBaseStorage extends LoadFu
         Properties udfProps = getUDFProperties();
         job.getConfiguration().setBoolean("pig.noSplitCombination", true);
 
-        initializeHBaseClassLoaderResources(job);
         m_conf = initializeLocalJobConfig(job);
         String delegationTokenSet = udfProps.getProperty(HBASE_TOKEN_SET);
         if (delegationTokenSet == null) {
@@ -748,14 +753,23 @@ public class HBaseStorage extends LoadFu
         }
     }
 
-    private void initializeHBaseClassLoaderResources(Job job) throws IOException {
+    @Override
+    public List<String> getShipFiles() {
         // Depend on HBase to do the right thing when available, as of HBASE-9165
         try {
             Method addHBaseDependencyJars =
               TableMapReduceUtil.class.getMethod("addHBaseDependencyJars", Configuration.class);
             if (addHBaseDependencyJars != null) {
-                addHBaseDependencyJars.invoke(null, job.getConfiguration());
-                return;
+                Configuration conf = new Configuration();
+                addHBaseDependencyJars.invoke(null, conf);
+                if (conf.get("tmpjars") != null) {
+                    String[] tmpjars = conf.getStrings("tmpjars");
+                    List<String> shipFiles = new ArrayList<String>(tmpjars.length);
+                    for (String tmpjar : tmpjars) {
+                        shipFiles.add(new URL(tmpjar).getPath());
+                    }
+                    return shipFiles;
+                }
             }
         } catch (NoSuchMethodException e) {
             LOG.debug("TableMapReduceUtils#addHBaseDependencyJars not available."
@@ -766,32 +780,32 @@ public class HBaseStorage extends LoadFu
         } catch (InvocationTargetException e) {
             LOG.debug("TableMapReduceUtils#addHBaseDependencyJars invocation"
               + " failed. Falling back to previous logic.", e);
+        } catch (MalformedURLException e) {
+            LOG.debug("TableMapReduceUtils#addHBaseDependencyJars tmpjars"
+                    + " had malformed url. Falling back to previous logic.", e);
         }
-        // fall back to manual class handling.
-        // Make sure the HBase, ZooKeeper, and Guava jars get shipped.
-        TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
-            org.apache.hadoop.hbase.client.HTable.class, // main hbase jar or hbase-client
-            org.apache.hadoop.hbase.mapreduce.TableSplit.class, // main hbase jar or hbase-server
-            com.google.common.collect.Lists.class, // guava
-            org.apache.zookeeper.ZooKeeper.class); // zookeeper
+
+        List<Class> classList = new ArrayList<Class>();
+        classList.add(org.apache.hadoop.hbase.client.HTable.class); // main hbase jar or hbase-client
+        classList.add(org.apache.hadoop.hbase.mapreduce.TableSplit.class); // main hbase jar or hbase-server
+        classList.add(com.google.common.collect.Lists.class); // guava
+        classList.add(org.apache.zookeeper.ZooKeeper.class); // zookeeper
         // Additional jars that are specific to v0.95.0+
-        addClassToJobIfExists(job, "org.cloudera.htrace.Trace"); // htrace
-        addClassToJobIfExists(job, "org.apache.hadoop.hbase.protobuf.generated.HBaseProtos"); // hbase-protocol
-        addClassToJobIfExists(job, "org.apache.hadoop.hbase.TableName"); // hbase-common
-        addClassToJobIfExists(job, "org.apache.hadoop.hbase.CompatibilityFactory"); // hbase-hadoop-compar
-        addClassToJobIfExists(job, "org.jboss.netty.channel.ChannelFactory"); // netty
-    }
-
-    private void addClassToJobIfExists(Job job, String className) throws IOException {
-      Class klass = null;
-      try {
-          klass = Class.forName(className);
-      } catch (ClassNotFoundException e) {
-          LOG.debug("Skipping adding jar for class: " + className);
-          return;
-      }
+        addClassToList("org.cloudera.htrace.Trace", classList); // htrace
+        addClassToList("org.apache.hadoop.hbase.protobuf.generated.HBaseProtos", classList); // hbase-protocol
+        addClassToList("org.apache.hadoop.hbase.TableName", classList); // hbase-common
+        addClassToList("org.apache.hadoop.hbase.CompatibilityFactory", classList); // hbase-hadoop-compar
+        addClassToList("org.jboss.netty.channel.ChannelFactory", classList); // netty
+        return FuncUtils.getShipFiles(classList);
+    }
 
-      TableMapReduceUtil.addDependencyJars(job.getConfiguration(), klass);
+    private void addClassToList(String className, List<Class> classList) {
+        try {
+            Class klass = Class.forName(className);
+            classList.add(klass);
+        } catch (ClassNotFoundException e) {
+            LOG.debug("Skipping adding jar for class: " + className);
+        }
     }
 
     private JobConf initializeLocalJobConfig(Job job) {
@@ -940,21 +954,25 @@ public class HBaseStorage extends LoadFu
                         DataType.findType(t.get(i)) : fieldSchemas[i].getType()));
             } else {
                 Map<String, Object> cfMap = (Map<String, Object>) t.get(i);
-                for (String colName : cfMap.keySet()) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("putNext - colName=" + colName +
-                                  ", class: " + colName.getClass());
+                if (cfMap!=null) {
+                    for (String colName : cfMap.keySet()) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("putNext - colName=" + colName +
+                                      ", class: " + colName.getClass());
+                        }
+                        // TODO deal with the fact that maps can have types now. Currently we detect types at
+                        // runtime in the case of storing to a cf, which is suboptimal.
+                        put.add(columnInfo.getColumnFamily(), Bytes.toBytes(colName.toString()), ts,
+                                objToBytes(cfMap.get(colName), DataType.findType(cfMap.get(colName))));
                     }
-                    // TODO deal with the fact that maps can have types now. Currently we detect types at
-                    // runtime in the case of storing to a cf, which is suboptimal.
-                    put.add(columnInfo.getColumnFamily(), Bytes.toBytes(colName.toString()), ts,
-                            objToBytes(cfMap.get(colName), DataType.findType(cfMap.get(colName))));
                 }
             }
         }
 
         try {
-            writer.write(null, put);
+            if (!put.isEmpty()) {
+                writer.write(null, put);
+            }
         } catch (InterruptedException e) {
             throw new IOException(e);
         }
@@ -1031,7 +1049,6 @@ public class HBaseStorage extends LoadFu
             schema_ = (ResourceSchema) ObjectSerializer.deserialize(serializedSchema);
         }
 
-        initializeHBaseClassLoaderResources(job);
         m_conf = initializeLocalJobConfig(job);
         // Not setting a udf property and getting the hbase delegation token
         // only once like in setLocation as setStoreLocation gets different Job
@@ -1115,28 +1132,24 @@ public class HBaseStorage extends LoadFu
         return new RequiredFieldResponse(true);
     }
 
-    @Override
-    public WritableComparable<InputSplit> getSplitComparable(InputSplit split)
-            throws IOException {
-        return new WritableComparable<InputSplit>() {
-            TableSplit tsplit = new TableSplit();
-
-            @Override
-            public void readFields(DataInput in) throws IOException {
-                tsplit.readFields(in);
-            }
-
-            @Override
-            public void write(DataOutput out) throws IOException {
-                tsplit.write(out);
-            }
+    public void ensureAllKeyInstancesInSameSplit() throws IOException {
+        /** 
+         * no-op because hbase keys are unique 
+         * This will also work with things like DelimitedKeyPrefixRegionSplitPolicy
+         * if you need a partial key match to be included in the split
+         */
+        LOG.debug("ensureAllKeyInstancesInSameSplit");
+    }
 
-            @Override
-            public int compareTo(InputSplit split) {
-                return tsplit.compareTo((TableSplit) split);
-            }
-        };
+    @Override
+    public WritableComparable<TableSplit> getSplitComparable(InputSplit split) throws IOException {
+        if (split instanceof TableSplit) {
+            return new TableSplitComparable((TableSplit) split);
+        } else {
+            throw new RuntimeException("LoadFunc expected split of type TableSplit but was " + split.getClass().getName());
+        }
     }
+ 
 
     /**
      * Class to encapsulate logic around which column names were specified in each

Modified: pig/branches/spark/src/org/apache/pig/builtin/ABS.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/ABS.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/ABS.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/ABS.java Thu Nov 27 12:49:54 2014
@@ -42,7 +42,7 @@ public class ABS extends EvalFunc<Double
 	 * @return output returns a single numeric value, absolute value of the argument
 	 */
 	public Double exec(Tuple input) throws IOException {
-        if (input == null || input.size() == 0)
+        if (input == null || input.size() == 0 || input.get(0) == null)
             return null;
 
         Double d;

Modified: pig/branches/spark/src/org/apache/pig/builtin/AvroStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/AvroStorage.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/AvroStorage.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/AvroStorage.java Thu Nov 27 12:49:54 2014
@@ -31,6 +31,7 @@ import org.apache.avro.Schema.Type;
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericData;
+import org.apache.avro.mapred.AvroInputFormat;
 import org.apache.avro.mapred.AvroOutputFormat;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
@@ -63,9 +64,11 @@ import org.apache.pig.ResourceSchema;
 import org.apache.pig.ResourceStatistics;
 import org.apache.pig.StoreFunc;
 import org.apache.pig.StoreFuncInterface;
+import org.apache.pig.StoreResources;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.JarManager;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.impl.util.Utils;
 import org.apache.pig.impl.util.avro.AvroArrayReader;
@@ -82,7 +85,7 @@ import com.google.common.collect.Maps;
  *
  */
 public class AvroStorage extends LoadFunc
-    implements StoreFuncInterface, LoadMetadata, LoadPushDown {
+    implements StoreFuncInterface, LoadMetadata, LoadPushDown, StoreResources {
 
   /**
    *  Creates new instance of Pig Storage function, without specifying
@@ -593,7 +596,11 @@ public class AvroStorage extends LoadFun
         } else {
           rr = new AvroRecordReader(s);
         }
-        rr.initialize(is, tc);
+        try {
+            rr.initialize(is, tc);
+        } finally {
+            rr.close();
+        }
         tc.setStatus(is.toString());
         return rr;
       }
@@ -674,4 +681,9 @@ public class AvroStorage extends LoadFun
 
   }
 
+  @Override
+  public List<String> getShipFiles() {
+      Class[] classList = new Class[] {Schema.class, AvroInputFormat.class};
+      return FuncUtils.getShipFiles(classList);
+  }
 }

Modified: pig/branches/spark/src/org/apache/pig/builtin/BigDecimalAbs.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/BigDecimalAbs.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/BigDecimalAbs.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/BigDecimalAbs.java Thu Nov 27 12:49:54 2014
@@ -27,6 +27,8 @@ import org.apache.pig.data.Tuple;
 public class BigDecimalAbs extends EvalFunc<BigDecimal> {
     @Override
     public BigDecimal exec(Tuple input) throws IOException {
+        if (input == null || input.size() == 0 || input.get(0) == null)
+            return null;
         return ((BigDecimal)input.get(0)).abs();
     }
 

Modified: pig/branches/spark/src/org/apache/pig/builtin/BigIntegerAbs.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/BigIntegerAbs.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/BigIntegerAbs.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/BigIntegerAbs.java Thu Nov 27 12:49:54 2014
@@ -27,6 +27,8 @@ import org.apache.pig.data.Tuple;
 public class BigIntegerAbs extends EvalFunc<BigInteger> {
     @Override
     public BigInteger exec(Tuple input) throws IOException {
+        if (input == null || input.size() == 0 || input.get(0) == null)
+            return null;
         return ((BigInteger)input.get(0)).abs();
     }
 

Modified: pig/branches/spark/src/org/apache/pig/builtin/Bloom.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/Bloom.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/Bloom.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/Bloom.java Thu Nov 27 12:49:54 2014
@@ -21,7 +21,9 @@ package org.apache.pig.builtin;
 
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
+import java.io.File;
 import java.io.FileInputStream;
+import java.io.FilenameFilter;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -29,7 +31,6 @@ import java.util.List;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.bloom.BloomFilter;
 import org.apache.hadoop.util.bloom.Key;
-
 import org.apache.pig.FilterFunc;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
@@ -94,9 +95,22 @@ public class Bloom extends FilterFunc {
 
     private void init() throws IOException {
         filter = new BloomFilter();
-        String dcFile = "./" + getFilenameFromPath(bloomFile) +
-            "/part-r-00000";
-        filter.readFields(new DataInputStream(new FileInputStream(dcFile)));
+        String dir = "./" + getFilenameFromPath(bloomFile);
+        String[] partFiles = new File(dir)
+                .list(new FilenameFilter() {
+                    @Override
+                    public boolean accept(File current, String name) {
+                        return name.startsWith("part");
+                    }
+                });
+
+        String dcFile = dir + "/" + partFiles[0];
+        DataInputStream dis = new DataInputStream(new FileInputStream(dcFile));
+        try {
+            filter.readFields(dis);
+        } finally {
+            dis.close();
+        }
     }
 
     /**

Modified: pig/branches/spark/src/org/apache/pig/builtin/BuildBloom.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/BuildBloom.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/BuildBloom.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/BuildBloom.java Thu Nov 27 12:49:54 2014
@@ -39,15 +39,15 @@ import org.apache.pig.impl.logicalLayer.
  * define bb BuildBloom('jenkins', '100', '0.1');
  * A = load 'foo' as (x, y);
  * B = group A all;
- * C = foreach B generate BuildBloom(A.x);
+ * C = foreach B generate bb(A.x);
  * store C into 'mybloom';
  * The bloom filter can be on multiple keys by passing more than one field
  * (or the entire bag) to BuildBloom.
  * The resulting file can then be used in a Bloom filter as:
- * define bloom Bloom(mybloom);
+ * define bloom Bloom('mybloom');
  * A = load 'foo' as (x, y);
  * B = load 'bar' as (z);
- * C = filter B by Bloom(z);
+ * C = filter B by bloom(z);
  * D = join C by z, A by x;
  * It uses {@link org.apache.hadoop.util.bloom.BloomFilter}.
  */

Modified: pig/branches/spark/src/org/apache/pig/builtin/Distinct.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/Distinct.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/Distinct.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/Distinct.java Thu Nov 27 12:49:54 2014
@@ -22,6 +22,9 @@ import java.io.IOException;
 
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
+import org.apache.pig.JVMReuseManager;
+import org.apache.pig.PigConfiguration;
+import org.apache.pig.StaticDataCleanup;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.data.BagFactory;
@@ -38,35 +41,35 @@ import org.apache.pig.data.TupleFactory;
  */
 public class Distinct  extends EvalFunc<DataBag> implements Algebraic {
 
-    private static BagFactory bagFactory = BagFactory.getInstance();
     private static TupleFactory tupleFactory = TupleFactory.getInstance();
-    /* (non-Javadoc)
-     * @see org.apache.pig.EvalFunc#exec(org.apache.pig.data.Tuple)
-     */
+    private static boolean initialized = false;
+    private static boolean useDefaultBag = false;
+
+    static {
+        JVMReuseManager.getInstance().registerForStaticDataCleanup(Distinct.class);
+    }
+
+    @StaticDataCleanup
+    public static void staticDataCleanup() {
+        initialized = false;
+        useDefaultBag = false;
+    }
+
     @Override
     public DataBag exec(Tuple input) throws IOException {
         return getDistinct(input);
     }
 
-    /* (non-Javadoc)
-     * @see org.apache.pig.Algebraic#getFinal()
-     */
     @Override
     public String getFinal() {
         return Final.class.getName();
     }
 
-    /* (non-Javadoc)
-     * @see org.apache.pig.Algebraic#getInitial()
-     */
     @Override
     public String getInitial() {
         return Initial.class.getName();
     }
 
-    /* (non-Javadoc)
-     * @see org.apache.pig.Algebraic#getIntermed()
-     */
     @Override
     public String getIntermed() {
         return Intermediate.class.getName();
@@ -74,13 +77,10 @@ public class Distinct  extends EvalFunc<
 
     static public class Initial extends EvalFunc<Tuple> {
 
-        /* (non-Javadoc)
-         * @see org.apache.pig.EvalFunc#exec(org.apache.pig.data.Tuple)
-         */
         @Override
         public Tuple exec(Tuple input) throws IOException {
             // the input has  a single field which is a tuple
-            // representing the data we want to distinct. 
+            // representing the data we want to distinct.
             // unwrap, put in a bag and send down
             try {
                 Tuple single = (Tuple)input.get(0);
@@ -94,9 +94,6 @@ public class Distinct  extends EvalFunc<
 
     static public class Intermediate extends EvalFunc<Tuple> {
 
-        /* (non-Javadoc)
-         * @see org.apache.pig.EvalFunc#exec(org.apache.pig.data.Tuple)
-         */
         @Override
         public Tuple exec(Tuple input) throws IOException {
             return tupleFactory.newTuple(getDistinctFromNestedBags(input, this));
@@ -105,30 +102,27 @@ public class Distinct  extends EvalFunc<
 
     static public class Final extends EvalFunc<DataBag> {
 
-        /* (non-Javadoc)
-         * @see org.apache.pig.EvalFunc#exec(org.apache.pig.data.Tuple)
-         */
         @Override
         public DataBag exec(Tuple input) throws IOException {
             return getDistinctFromNestedBags(input, this);
         }
     }
-    
-    static private DataBag createDataBag() {
-    	// by default, we create InternalSortedBag, unless user configures
-		// explicitly to use old bag
-    	String bagType = null;
-        if (PigMapReduce.sJobConfInternal.get() != null) {     
-   			bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.distinct.type");       			
-   	    }
-                      
-    	if (bagType != null && bagType.equalsIgnoreCase("default")) {        	    	
-        	return BagFactory.getInstance().newDistinctBag();    			
-   	    } else {   	    	
-   	    	return new InternalDistinctBag(3);
-	    }
+
+    private static DataBag createDataBag() {
+        if (!initialized) {
+            initialized = true;
+            if (PigMapReduce.sJobConfInternal.get() != null) {
+                String bagType = PigMapReduce.sJobConfInternal.get().get(PigConfiguration.PIG_CACHEDBAG_DISTINCT_TYPE);
+                if (bagType != null && bagType.equalsIgnoreCase("default")) {
+                    useDefaultBag = true;
+                }
+            }
+        }
+        // by default, we create InternalDistinctBag, unless user configures
+        // explicitly to use old bag
+        return useDefaultBag ? BagFactory.getInstance().newDistinctBag() : new InternalDistinctBag(3);
     }
-    
+
     static private DataBag getDistinctFromNestedBags(Tuple input, EvalFunc evalFunc) throws IOException {
         DataBag result = createDataBag();
         long progressCounter = 0;
@@ -144,7 +138,7 @@ public class Distinct  extends EvalFunc<
                 for (Tuple t : (DataBag)tuple.get(0)) {
                     result.add(t);
                     ++progressCounter;
-                    if((progressCounter % 1000) == 0){                      
+                    if((progressCounter % 1000) == 0){
                         evalFunc.progress();
                     }
                 }
@@ -154,7 +148,7 @@ public class Distinct  extends EvalFunc<
         }
         return result;
     }
-    
+
     protected DataBag getDistinct(Tuple input) throws IOException {
         try {
             DataBag inputBg = (DataBag)input.get(0);

Modified: pig/branches/spark/src/org/apache/pig/builtin/DoubleRound.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/DoubleRound.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/DoubleRound.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/DoubleRound.java Thu Nov 27 12:49:54 2014
@@ -37,7 +37,7 @@ public class DoubleRound extends EvalFun
 	 */
 	@Override
 	public Long exec(Tuple input) throws IOException {
-        if (input == null || input.size() == 0)
+        if (input == null || input.size() == 0 || input.get(0) == null)
             return null;
 
         try{

Modified: pig/branches/spark/src/org/apache/pig/builtin/FloatRound.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/FloatRound.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/FloatRound.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/FloatRound.java Thu Nov 27 12:49:54 2014
@@ -39,7 +39,7 @@ public class FloatRound extends EvalFunc
 	 */
 	@Override
 	public Integer exec(Tuple input) throws IOException {
-        if (input == null || input.size() == 0)
+        if (input == null || input.size() == 0 || input.get(0) == null)
             return null;
 
         try{

Modified: pig/branches/spark/src/org/apache/pig/builtin/JsonLoader.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/JsonLoader.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/JsonLoader.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/JsonLoader.java Thu Nov 27 12:49:54 2014
@@ -19,28 +19,26 @@ package org.apache.pig.builtin;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.math.BigDecimal;
-import java.math.BigInteger;
 
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
 import org.joda.time.format.ISODateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;
-
 import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonParseException;
 import org.codehaus.jackson.JsonParser;
 import org.codehaus.jackson.JsonToken;
-
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-
 import org.apache.pig.Expression;
 import org.apache.pig.LoadCaster;
 import org.apache.pig.LoadFunc;
@@ -56,6 +54,7 @@ import org.apache.pig.data.DataByteArray
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.util.JarManager;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.impl.util.Utils;
 import org.apache.pig.parser.ParserException;
@@ -153,23 +152,31 @@ public class JsonLoader extends LoadFunc
         // isn't what we expect we return a tuple with null fields rather than
         // throwing an exception.  That way a few mangled lines don't fail the
         // job.
-        if (p.nextToken() != JsonToken.START_OBJECT) {
-            warn("Bad record, could not find start of record " +
-                val.toString(), PigWarning.UDF_WARNING_1);
-            return t;
-        }
-
-        // Read each field in the record
-        for (int i = 0; i < fields.length; i++) {
-            t.set(i, readField(p, fields[i], i));
-        }
-
-        if (p.nextToken() != JsonToken.END_OBJECT) {
-            warn("Bad record, could not find end of record " +
-                val.toString(), PigWarning.UDF_WARNING_1);
-            return t;
+        
+        try {
+            if (p.nextToken() != JsonToken.START_OBJECT) {
+                warn("Bad record, could not find start of record " +
+                    val.toString(), PigWarning.UDF_WARNING_1);
+                return t;
+            }
+    
+            // Read each field in the record
+            for (int i = 0; i < fields.length; i++) {
+                t.set(i, readField(p, fields[i], i));
+            }
+    
+            if (p.nextToken() != JsonToken.END_OBJECT) {
+                warn("Bad record, could not find end of record " +
+                    val.toString(), PigWarning.UDF_WARNING_1);
+                return t;
+            }
+            
+        } catch (JsonParseException jpe) {
+            warn("Bad record, returning null for " + val, PigWarning.UDF_WARNING_1);
+        } finally {
+            p.close();
         }
-        p.close();
+        
         return t;
     }
 
@@ -242,7 +249,7 @@ public class JsonLoader extends LoadFunc
         case DataType.BIGDECIMAL:
             tok = p.nextToken();
             if (tok == JsonToken.VALUE_NULL) return null;
-            return p.getDecimalValue();
+            return new BigDecimal(p.getText());
 
         case DataType.MAP:
             // Should be a start of the map object
@@ -372,4 +379,11 @@ public class JsonLoader extends LoadFunc
     throws IOException {
         // We don't have partitions
     }
+
+    @Override
+    public List<String> getShipFiles() {
+        List<String> cacheFiles = new ArrayList<String>();
+        Class[] classList = new Class[] {JsonFactory.class};
+        return FuncUtils.getShipFiles(classList);
+    }
 }

Modified: pig/branches/spark/src/org/apache/pig/builtin/JsonStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/JsonStorage.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/JsonStorage.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/JsonStorage.java Thu Nov 27 12:49:54 2014
@@ -19,17 +19,15 @@ package org.apache.pig.builtin;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 
-import org.joda.time.DateTime;
-
 import org.codehaus.jackson.JsonEncoding;
 import org.codehaus.jackson.JsonFactory;
 import org.codehaus.jackson.JsonGenerator;
-
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
@@ -38,25 +36,18 @@ import org.apache.hadoop.mapreduce.Outpu
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-
 import org.apache.pig.ResourceSchema;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.ResourceStatistics;
 import org.apache.pig.StoreMetadata;
 import org.apache.pig.StoreFunc;
+import org.apache.pig.StoreResources;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.impl.util.Utils;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.TreeMap;
-
 /**
  * A JSON Pig store function.  Each Pig tuple is stored on one line (as one
  * value for TextOutputFormat) so that it can be read easily using
@@ -66,7 +57,7 @@ import java.util.TreeMap;
  * with mapping between JSON and Pig types. The schema file share the same format
  * as the one we use in PigStorage.
  */
-public class JsonStorage extends StoreFunc implements StoreMetadata {
+public class JsonStorage extends StoreFunc implements StoreMetadata, StoreResources {
 
     protected RecordWriter writer = null;
     protected ResourceSchema schema = null;
@@ -318,4 +309,14 @@ public class JsonStorage extends StoreFu
       return s;
     }
 
+    @Override
+    public List<String> getShipFiles() {
+        Class[] classList = new Class[] {JsonFactory.class};
+        return FuncUtils.getShipFiles(classList);
+    }
+
+    @Override
+    public List<String> getCacheFiles() {
+        return null;
+    }
 }

Modified: pig/branches/spark/src/org/apache/pig/builtin/OrcStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/OrcStorage.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/OrcStorage.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/OrcStorage.java Thu Nov 27 12:49:54 2014
@@ -20,11 +20,11 @@ package org.apache.pig.builtin;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.math.BigInteger;
+import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Properties;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
@@ -37,7 +37,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.io.orc.CompressionKind;
 import org.apache.hadoop.hive.ql.io.orc.OrcFile;
@@ -50,12 +49,13 @@ import org.apache.hadoop.hive.ql.io.orc.
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.Builder;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
-import org.apache.hadoop.hive.serde2.io.DateWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.hive.shims.HadoopShimsSecure;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.OutputFormat;
@@ -82,6 +82,7 @@ import org.apache.pig.Expression.BinaryE
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.ResourceStatistics;
 import org.apache.pig.StoreFuncInterface;
+import org.apache.pig.StoreResources;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.data.DataType;
@@ -93,6 +94,7 @@ import org.apache.pig.impl.util.Utils;
 import org.apache.pig.impl.util.orc.OrcUtils;
 import org.joda.time.DateTime;
 
+import com.esotericsoftware.kryo.io.Input;
 import com.google.common.annotations.VisibleForTesting;
 
 /**
@@ -111,7 +113,7 @@ import com.google.common.annotations.Vis
  * <li><code>-v, --version</code> Sets the version of the file that will be written
  * </ul>
  **/
-public class OrcStorage extends LoadFunc implements StoreFuncInterface, LoadMetadata, LoadPushDown, LoadPredicatePushdown {
+public class OrcStorage extends LoadFunc implements StoreFuncInterface, LoadMetadata, LoadPushDown, LoadPredicatePushdown, StoreResources {
 
     //TODO Make OrcInputFormat.SARG_PUSHDOWN visible
     private static final String SARG_PUSHDOWN = "sarg.pushdown";
@@ -384,6 +386,26 @@ public class OrcStorage extends LoadFunc
         }
     }
 
+    @Override
+    public List<String> getShipFiles() {
+        List<String> cacheFiles = new ArrayList<String>();
+        String hadoopVersion = "20S";
+        if (Utils.isHadoop23() || Utils.isHadoop2()) {
+            hadoopVersion = "23";
+        }
+        Class hadoopVersionShimsClass;
+        try {
+            hadoopVersionShimsClass = Class.forName("org.apache.hadoop.hive.shims.Hadoop" +
+                    hadoopVersion + "Shims");
+        } catch (ClassNotFoundException e) {
+            throw new RuntimeException("Cannot find Hadoop" + hadoopVersion + "ShimsClass in classpath");
+        }
+        Class[] classList = new Class[] {OrcFile.class, HiveConf.class, AbstractSerDe.class,
+                org.apache.hadoop.hive.shims.HadoopShims.class, HadoopShimsSecure.class, hadoopVersionShimsClass,
+                Input.class};
+        return FuncUtils.getShipFiles(classList);
+    }
+
     private static Path getFirstFile(String location, FileSystem fs) throws IOException {
         String[] locations = getPathStrings(location);
         Path[] paths = new Path[locations.length];
@@ -498,8 +520,6 @@ public class OrcStorage extends LoadFunc
         for (ResourceFieldSchema field : schema.getFields()) {
             switch(field.getType()) {
             case DataType.BOOLEAN:
-                // TODO: ORC does not seem to support it
-                break;
             case DataType.INTEGER:
             case DataType.LONG:
             case DataType.FLOAT:
@@ -671,14 +691,12 @@ public class OrcStorage extends LoadFunc
     }
 
     private Object getSearchArgObjValue(Object value) {
-           // TODO Test BigInteger, BigInteger and DateTime
         if (value instanceof BigInteger) {
-            return HiveDecimal.create(((BigInteger)value));
+            return new BigDecimal((BigInteger)value);
         } else if (value instanceof BigDecimal) {
-            return HiveDecimal.create(((BigDecimal)value), false);
+            return value;
         } else if (value instanceof DateTime) {
-            //TODO is this right based on what DateTimeWritable.dateToDays() does? What about pig.datetime.default.tz?
-            return new DateWritable((int)(((DateTime)value).getMillis() / TimeUnit.DAYS.toMillis(1)));
+            return new Timestamp(((DateTime)value).getMillis());
         } else {
             return value;
         }

Modified: pig/branches/spark/src/org/apache/pig/builtin/ROUND.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/ROUND.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/ROUND.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/ROUND.java Thu Nov 27 12:49:54 2014
@@ -44,7 +44,7 @@ public class ROUND extends EvalFunc<Long
 	 */
 	@Override
 	public Long exec(Tuple input) throws IOException {
-        if (input == null || input.size() == 0)
+        if (input == null || input.size() == 0 || input.get(0) == null)
             return null;
 
         try{

Modified: pig/branches/spark/src/org/apache/pig/builtin/TextLoader.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/TextLoader.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/TextLoader.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/TextLoader.java Thu Nov 27 12:49:54 2014
@@ -148,7 +148,9 @@ public class TextLoader extends LoadFunc
 
     @Override
     public Map<String, Object> bytesToMap(byte[] b, ResourceFieldSchema schema) throws IOException {
-        return bytesToMap(b, schema);
+        int errCode = 2109;
+        String msg = "TextLoader does not support conversion to Map.";
+        throw new ExecException(msg, errCode, PigException.BUG);
     }
 
     /**