You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2014/11/27 13:50:02 UTC
svn commit: r1642132 [6/14] - in /pig/branches/spark: ./ bin/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/datetime/convert/
contrib/piggybank/java/sr...
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java Thu Nov 27 12:49:54 2014
@@ -23,10 +23,11 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ThreadFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -34,6 +35,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.pig.PigConfiguration;
+import org.apache.pig.PigException;
import org.apache.pig.PigWarning;
import org.apache.pig.backend.BackendException;
import org.apache.pig.backend.executionengine.ExecException;
@@ -41,40 +43,64 @@ import org.apache.pig.backend.hadoop.dat
import org.apache.pig.backend.hadoop.executionengine.Launcher;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import org.apache.pig.backend.hadoop.executionengine.tez.optimizers.LoaderProcessor;
-import org.apache.pig.backend.hadoop.executionengine.tez.optimizers.NoopFilterRemover;
-import org.apache.pig.backend.hadoop.executionengine.tez.optimizers.ParallelismSetter;
-import org.apache.pig.backend.hadoop.executionengine.tez.optimizers.UnionOptimizer;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezCompiler;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPOPackageAnnotator;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainer;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerNode;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerPrinter;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.NativeTezOper;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.AccumulatorOptimizer;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.CombinerOptimizer;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.LoaderProcessor;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.MultiQueryOptimizerTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.NoopFilterRemover;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.ParallelismSetter;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.SecondaryKeyOptimizerTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.UnionOptimizer;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.plan.CompilationMessageCollector;
import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.LogUtils;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.tools.pigstats.OutputStats;
import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.tez.TezPigScriptStats;
import org.apache.pig.tools.pigstats.tez.TezScriptState;
-import org.apache.pig.tools.pigstats.tez.TezStats;
-import org.apache.pig.tools.pigstats.tez.TezTaskStats;
-import org.apache.tez.common.TezUtils;
+import org.apache.pig.tools.pigstats.tez.TezVertexStats;
+import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
/**
* Main class that launches pig for Tez
*/
public class TezLauncher extends Launcher {
private static final Log log = LogFactory.getLog(TezLauncher.class);
- private ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1);
+ private static ThreadFactory namedThreadFactory;
+ private ExecutorService executor;
private boolean aggregateWarning = false;
private TezScriptState tezScriptState;
- private TezStats tezStats;
+ private TezPigScriptStats tezStats;
private TezJob runningJob;
+ public TezLauncher() {
+ if (namedThreadFactory == null) {
+ namedThreadFactory = new ThreadFactoryBuilder()
+ .setNameFormat("PigTezLauncher-%d")
+ .setUncaughtExceptionHandler(new JobControlThreadExceptionHandler())
+ .build();
+ }
+ executor = Executors.newSingleThreadExecutor(namedThreadFactory);
+ }
+
@Override
public PigStats launchPig(PhysicalPlan php, String grpName, PigContext pc) throws Exception {
if (pc.getExecType().isLocal()) {
@@ -83,7 +109,7 @@ public class TezLauncher extends Launche
pc.getProperties().setProperty("tez.ignore.lib.uris", "true");
}
Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties(), true);
- if (pc.defaultParallel == -1 && !conf.getBoolean(PigConfiguration.TEZ_AUTO_PARALLELISM, true)) {
+ if (pc.defaultParallel == -1 && !conf.getBoolean(PigConfiguration.PIG_TEZ_AUTO_PARALLELISM, true)) {
pc.defaultParallel = 1;
}
aggregateWarning = conf.getBoolean("aggregate.warning", false);
@@ -98,60 +124,52 @@ public class TezLauncher extends Launche
List<TezOperPlan> processedPlans = new ArrayList<TezOperPlan>();
tezScriptState = TezScriptState.get();
- tezStats = new TezStats(pc);
+ tezStats = new TezPigScriptStats(pc);
PigStats.start(tezStats);
conf.set(TezConfiguration.TEZ_USE_CLUSTER_HADOOP_LIBS, "true");
TezJobCompiler jc = new TezJobCompiler(pc, conf);
TezPlanContainer tezPlanContainer = compile(php, pc);
- int defaultTimeToSleep = pc.getExecType().isLocal() ? 100 : 1000;
- int timeToSleep = conf.getInt("pig.jobcontrol.sleep", defaultTimeToSleep);
- if (timeToSleep != defaultTimeToSleep) {
- log.info("overriding default JobControl sleep (" +
- defaultTimeToSleep + ") to " + timeToSleep);
- }
+ tezStats.initialize(tezPlanContainer);
+ tezScriptState.emitInitialPlanNotification(tezPlanContainer);
+ tezScriptState.emitLaunchStartedNotification(tezPlanContainer.size()); //number of DAGs to Launch
+ TezPlanContainerNode tezPlanContainerNode;
TezOperPlan tezPlan;
- while ((tezPlan=tezPlanContainer.getNextPlan(processedPlans)) != null) {
- optimize(tezPlan, pc);
+ int processedDAGs = 0;
+ while ((tezPlanContainerNode = tezPlanContainer.getNextPlan(processedPlans)) != null) {
+ tezPlan = tezPlanContainerNode.getTezOperPlan();
+ processLoadAndParallelism(tezPlan, pc);
processedPlans.add(tezPlan);
- ProgressReporter reporter = new ProgressReporter();
- tezStats.initialize(tezPlan);
+ ProgressReporter reporter = new ProgressReporter(tezPlanContainer.size(), processedDAGs);
if (tezPlan.size()==1 && tezPlan.getRoots().get(0) instanceof NativeTezOper) {
// Native Tez Plan
NativeTezOper nativeOper = (NativeTezOper)tezPlan.getRoots().get(0);
- tezScriptState.emitInitialPlanNotification(tezPlan);
- tezScriptState.emitLaunchStartedNotification(tezPlan.size());
tezScriptState.emitJobsSubmittedNotification(1);
- nativeOper.runJob();
+ nativeOper.runJob(tezPlanContainerNode.getOperatorKey().toString());
} else {
TezPOPackageAnnotator pkgAnnotator = new TezPOPackageAnnotator(tezPlan);
pkgAnnotator.visit();
- runningJob = jc.compile(tezPlan, grpName, tezPlanContainer);
-
- tezScriptState.emitInitialPlanNotification(tezPlan);
- tezScriptState.emitLaunchStartedNotification(tezPlan.size());
-
+ runningJob = jc.compile(tezPlanContainerNode, tezPlanContainer);
+ //TODO: Exclude vertex groups from numVerticesToLaunch ??
+ tezScriptState.dagLaunchNotification(runningJob.getName(), tezPlan, tezPlan.size());
+ runningJob.setPigStats(tezStats);
+
// Set the thread UDFContext so registered classes are available.
final UDFContext udfContext = UDFContext.getUDFContext();
- Thread task = new Thread(runningJob) {
+ Runnable task = new Runnable() {
@Override
public void run() {
+ Thread.currentThread().setContextClassLoader(PigContext.getClassLoader());
UDFContext.setUdfContext(udfContext.clone());
- super.run();
+ runningJob.run();
}
};
-
- JobControlThreadExceptionHandler jctExceptionHandler = new JobControlThreadExceptionHandler();
- task.setUncaughtExceptionHandler(jctExceptionHandler);
- task.setContextClassLoader(PigContext.getClassLoader());
-
- tezStats.setTezJob(runningJob);
-
+
// Mark the times that the jobs were submitted so it's reflected in job
- // history props
+ // history props. TODO: Fix this. unused now
long scriptSubmittedTimestamp = System.currentTimeMillis();
// Job.getConfiguration returns the shared configuration object
Configuration jobConf = runningJob.getConfiguration();
@@ -159,20 +177,42 @@ public class TezLauncher extends Launche
Long.toString(scriptSubmittedTimestamp));
jobConf.set("pig.job.submitted.timestamp",
Long.toString(System.currentTimeMillis()));
-
- Future<?> future = executor.schedule(task, timeToSleep, TimeUnit.MILLISECONDS);
-
+
+ Future<?> future = executor.submit(task);
tezScriptState.emitJobsSubmittedNotification(1);
- reporter.notifyStarted();
-
+
+ boolean jobStarted = false;
+
while (!future.isDone()) {
+ if (!jobStarted && runningJob.getApplicationId() != null) {
+ jobStarted = true;
+ String appId = runningJob.getApplicationId().toString();
+ //For Oozie Pig action job id matching compatibility with MR mode
+ log.info("HadoopJobId: "+ appId.replace("application", "job"));
+ tezScriptState.emitJobStartedNotification(appId);
+ tezScriptState.dagStartedNotification(runningJob.getName(), appId);
+ }
reporter.notifyUpdate();
Thread.sleep(1000);
}
-
- tezStats.accumulateStats(runningJob);
+ // For tez_local mode where PigProcessor destroys all UDFContext
+ UDFContext.setUdfContext(udfContext);
+ try {
+ // In case of FutureTask there is no uncaught exception
+ // Need to do future.get() to get any exception
+ future.get();
+ } catch (ExecutionException e) {
+ setJobException(e.getCause());
+ }
}
- tezScriptState.emitProgressUpdatedNotification(100);
+ processedDAGs++;
+ if (tezPlanContainer.size() == processedDAGs) {
+ tezScriptState.emitProgressUpdatedNotification(100);
+ } else {
+ tezScriptState.emitProgressUpdatedNotification(
+ ((tezPlanContainer.size() - processedDAGs)/tezPlanContainer.size()) * 100);
+ }
+ handleUnCaughtException(pc);
tezPlanContainer.updatePlan(tezPlan, reporter.notifyFinishedOrFailed());
}
@@ -203,6 +243,28 @@ public class TezLauncher extends Launche
return tezStats;
}
+ private void handleUnCaughtException(PigContext pc) throws Exception {
+ //check for the uncaught exceptions from TezJob thread
+ //if the job controller fails before launching the jobs then there are
+ //no jobs to check for failure
+ if (jobControlException != null) {
+ if (jobControlException instanceof PigException) {
+ if (jobControlExceptionStackTrace != null) {
+ LogUtils.writeLog("Error message from Tez Job",
+ jobControlExceptionStackTrace, pc
+ .getProperties().getProperty(
+ "pig.logfile"), log);
+ }
+ throw jobControlException;
+ } else {
+ int errCode = 2117;
+ String msg = "Unexpected error when launching Tez job.";
+ throw new ExecException(msg, errCode, PigException.BUG,
+ jobControlException);
+ }
+ }
+ }
+
private void computeWarningAggregate(Map<String, Map<String, Long>> counterGroups, Map<Enum, Long> aggMap) {
for (Map<String, Long> counters : counterGroups.values()) {
for (Enum e : PigWarning.values()) {
@@ -223,53 +285,57 @@ public class TezLauncher extends Launche
}
private class ProgressReporter {
+ private int totalDAGs;
+ private int processedDAGS;
private int count = 0;
private int prevProgress = 0;
- public void notifyStarted() throws IOException {
- for (Vertex v : runningJob.getDAG().getVertices()) {
- TezTaskStats tts = tezStats.getVertexStats(v.getName());
- UserPayload payload = v.getProcessorDescriptor().getUserPayload();
- Configuration conf = TezUtils.createConfFromUserPayload(payload);
- tts.setConf(conf);
- tts.setId(v.getName());
- tezScriptState.emitJobStartedNotification(v.getName());
- }
+ public ProgressReporter(int totalDAGs, int processedDAGs) {
+ this.totalDAGs = totalDAGs;
+ this.processedDAGS = processedDAGs;
}
public void notifyUpdate() {
DAGStatus dagStatus = runningJob.getDAGStatus();
if (dagStatus != null && dagStatus.getState() == DAGStatus.State.RUNNING) {
// Emit notification when the job has progressed more than 1%,
- // or every 10 second
+ // or every 20 seconds
int currProgress = Math.round(runningJob.getDAGProgress() * 100f);
if (currProgress - prevProgress >= 1 || count % 100 == 0) {
- tezScriptState.emitProgressUpdatedNotification(currProgress);
+ tezScriptState.dagProgressNotification(runningJob.getName(), -1, currProgress);
+ tezScriptState.emitProgressUpdatedNotification((currProgress + (100 * processedDAGS))/totalDAGs);
prevProgress = currProgress;
}
count++;
}
+ // TODO: Add new vertex tracking methods to PigTezProgressNotificationListener
+ // and emit notifications for individual vertex start, progress and completion
}
public boolean notifyFinishedOrFailed() {
DAGStatus dagStatus = runningJob.getDAGStatus();
+ if (dagStatus == null) {
+ return false;
+ }
if (dagStatus.getState() == DAGStatus.State.SUCCEEDED) {
Map<Enum, Long> warningAggMap = new HashMap<Enum, Long>();
- for (Vertex v : runningJob.getDAG().getVertices()) {
- TezTaskStats tts = tezStats.getVertexStats(v.getName());
- tezScriptState.emitjobFinishedNotification(tts);
- Map<String, Map<String, Long>> counterGroups = runningJob.getVertexCounters(v.getName());
- computeWarningAggregate(counterGroups, warningAggMap);
+ DAG dag = runningJob.getDAG();
+ for (Vertex v : dag.getVertices()) {
+ TezVertexStats tts = tezStats.getVertexStats(dag.getName(), v.getName());
+ if (tts == null) {
+ continue; //vertex groups
+ }
+ Map<String, Map<String, Long>> counterGroups = tts.getCounters();
+ if (counterGroups == null) {
+ log.warn("Counters are not available for vertex " + v.getName() + ". Not computing warning aggregates.");
+ } else {
+ computeWarningAggregate(counterGroups, warningAggMap);
+ }
}
if (aggregateWarning) {
CompilationMessageCollector.logAggregate(warningAggMap, MessageType.Warning, log);
}
return true;
- } else if (dagStatus.getState() == DAGStatus.State.FAILED) {
- for (Vertex v : ((TezJob)runningJob).getDAG().getVertices()) {
- TezTaskStats tts = tezStats.getVertexStats(v.getName());
- tezScriptState.emitJobFailedNotification(tts);
- }
}
return false;
}
@@ -281,10 +347,6 @@ public class TezLauncher extends Launche
VisitorException, IOException {
log.debug("Entering TezLauncher.explain");
TezPlanContainer tezPlanContainer = compile(php, pc);
- for (Map.Entry<OperatorKey,TezPlanContainerNode> entry : tezPlanContainer.getKeys().entrySet()) {
- TezOperPlan tezPlan = entry.getValue().getNode();
- optimize(tezPlan, pc);
- }
if (format.equals("text")) {
TezPlanContainerPrinter printer = new TezPlanContainerPrinter(ps, tezPlanContainer);
@@ -296,7 +358,20 @@ public class TezLauncher extends Launche
}
}
- public static void optimize(TezOperPlan tezPlan, PigContext pc) throws VisitorException {
+ public TezPlanContainer compile(PhysicalPlan php, PigContext pc)
+ throws PlanException, IOException, VisitorException {
+ TezCompiler comp = new TezCompiler(php, pc);
+ comp.compile();
+ TezPlanContainer planContainer = comp.getPlanContainer();
+ for (Map.Entry<OperatorKey, TezPlanContainerNode> entry : planContainer
+ .getKeys().entrySet()) {
+ TezOperPlan tezPlan = entry.getValue().getTezOperPlan();
+ optimize(tezPlan, pc);
+ }
+ return planContainer;
+ }
+
+ private void optimize(TezOperPlan tezPlan, PigContext pc) throws VisitorException {
Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
boolean aggregateWarning = conf.getBoolean("aggregate.warning", false);
@@ -304,10 +379,10 @@ public class TezLauncher extends Launche
filter.visit();
// Run CombinerOptimizer on Tez plan
- boolean nocombiner = conf.getBoolean(PigConfiguration.PROP_NO_COMBINER, false);
+ boolean nocombiner = conf.getBoolean(PigConfiguration.PIG_EXEC_NO_COMBINER, false);
if (!pc.inIllustrator && !nocombiner) {
boolean doMapAgg = Boolean.parseBoolean(pc.getProperties().getProperty(
- PigConfiguration.PROP_EXEC_MAP_PARTAGG, "false"));
+ PigConfiguration.PIG_EXEC_MAP_PARTAGG, "false"));
CombinerOptimizer co = new CombinerOptimizer(tezPlan, doMapAgg);
co.visit();
co.getMessageCollector().logMessages(MessageType.Warning, aggregateWarning, log);
@@ -321,7 +396,7 @@ public class TezLauncher extends Launche
skOptimizer.visit();
}
- boolean isMultiQuery = conf.getBoolean(PigConfiguration.OPT_MULTIQUERY, true);
+ boolean isMultiQuery = conf.getBoolean(PigConfiguration.PIG_OPT_MULTIQUERY, true);
if (isMultiQuery) {
// reduces the number of TezOpers in the Tez plan generated
// by multi-query (multi-store) script.
@@ -330,35 +405,32 @@ public class TezLauncher extends Launche
}
// Run AccumulatorOptimizer on Tez plan
- boolean isAccum = conf.getBoolean(PigConfiguration.OPT_ACCUMULATOR, true);
+ boolean isAccum = conf.getBoolean(PigConfiguration.PIG_OPT_ACCUMULATOR, true);
if (isAccum) {
AccumulatorOptimizer accum = new AccumulatorOptimizer(tezPlan);
accum.visit();
}
// Use VertexGroup in Tez
- boolean isUnionOpt = conf.getBoolean(PigConfiguration.TEZ_OPT_UNION, true);
+ boolean isUnionOpt = conf.getBoolean(PigConfiguration.PIG_TEZ_OPT_UNION, true);
if (isUnionOpt) {
UnionOptimizer uo = new UnionOptimizer(tezPlan);
uo.visit();
}
+ }
+
+ public static void processLoadAndParallelism(TezOperPlan tezPlan, PigContext pc) throws VisitorException {
if (!pc.inExplain && !pc.inDumpSchema) {
LoaderProcessor loaderStorer = new LoaderProcessor(tezPlan, pc);
loaderStorer.visit();
ParallelismSetter parallelismSetter = new ParallelismSetter(tezPlan, pc);
parallelismSetter.visit();
+ tezPlan.setEstimatedParallelism(parallelismSetter.getEstimatedTotalParallelism());
}
}
- public TezPlanContainer compile(PhysicalPlan php, PigContext pc)
- throws PlanException, IOException, VisitorException {
- TezCompiler comp = new TezCompiler(php, pc);
- comp.compile();
- return comp.getPlanContainer();
- }
-
@Override
public void kill() throws BackendException {
if (runningJob != null) {
@@ -368,9 +440,7 @@ public class TezLauncher extends Launche
throw new BackendException(e);
}
}
- if (executor != null) {
- executor.shutdownNow();
- }
+ destroy();
}
@Override
@@ -385,4 +455,17 @@ public class TezLauncher extends Launche
log.info("Cannot find job: " + jobID);
}
}
+
+ @Override
+ public void destroy() {
+ try {
+ if (executor != null && !executor.isShutdown()) {
+ log.info("Shutting down thread pool");
+ executor.shutdownNow();
+ }
+ } catch (Exception e) {
+ log.warn("Error shutting down threadpool");
+ }
+ }
+
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java Thu Nov 27 12:49:54 2014
@@ -33,7 +33,6 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.pig.backend.hadoop.datastorage.HPath;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
@@ -43,6 +42,7 @@ public class TezResourceManager {
private Path stagingDir;
private FileSystem remoteFs;
private Configuration conf;
+ private PigContext pigContext;
public Map<String, Path> resources = new HashMap<String, Path>();
static public TezResourceManager getInstance() {
@@ -54,9 +54,10 @@ public class TezResourceManager {
public void init(PigContext pigContext, Configuration conf) throws IOException {
if (!inited) {
- this.stagingDir = ((HPath)FileLocalizer.getTemporaryResourcePath(pigContext)).getPath();
+ this.stagingDir = FileLocalizer.getTemporaryResourcePath(pigContext);
this.remoteFs = FileSystem.get(conf);
this.conf = conf;
+ this.pigContext = pigContext;
this.inited = true;
}
}
@@ -77,7 +78,7 @@ public class TezResourceManager {
}
// Ship the local resource to the staging directory on the remote FS
- if (uri.toString().startsWith("file:")) {
+ if (!pigContext.getExecType().isLocal() && uri.toString().startsWith("file:")) {
Path remoteFsPath = remoteFs.makeQualified(new Path(stagingDir, resourceName));
remoteFs.copyFromLocalFile(resourcePath, remoteFsPath);
remoteFs.setReplication(remoteFsPath, (short)conf.getInt(Job.SUBMIT_REPLICATION, 3));
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java Thu Nov 27 12:49:54 2014
@@ -29,8 +29,11 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezJob.TezJobConfig;
import org.apache.pig.backend.hadoop.executionengine.tez.util.MRToTezHelper;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.util.Utils;
+import org.apache.pig.tools.pigstats.tez.TezScriptState;
import org.apache.tez.client.TezAppMasterStatus;
import org.apache.tez.client.TezClient;
import org.apache.tez.dag.api.TezConfiguration;
@@ -80,9 +83,13 @@ public class TezSessionManager {
private static List<SessionInfo> sessionPool = new ArrayList<SessionInfo>();
private static SessionInfo createSession(Configuration conf,
- Map<String, LocalResource> requestedAMResources, Credentials creds)
- throws TezException, IOException, InterruptedException {
+ Map<String, LocalResource> requestedAMResources, Credentials creds,
+ TezJobConfig tezJobConf) throws TezException, IOException,
+ InterruptedException {
TezConfiguration amConf = MRToTezHelper.getDAGAMConfFromMRConf(conf);
+ TezScriptState ss = TezScriptState.get();
+ ss.addDAGSettingsToConf(amConf);
+ adjustAMConfig(amConf, tezJobConf);
String jobName = conf.get(PigContext.JOB_NAME, "pig");
TezClient tezClient = TezClient.create(jobName, amConf, true, requestedAMResources, creds);
tezClient.start();
@@ -94,6 +101,56 @@ public class TezSessionManager {
return new SessionInfo(tezClient, requestedAMResources);
}
+ private static void adjustAMConfig(TezConfiguration amConf, TezJobConfig tezJobConf) {
+ int requiredAMMaxHeap = -1;
+ int requiredAMResourceMB = -1;
+ int configuredAMMaxHeap = Utils.extractHeapSizeInMB(amConf.get(
+ TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS,
+ TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS_DEFAULT));
+ int configuredAMResourceMB = amConf.getInt(
+ TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB,
+ TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB_DEFAULT);
+
+ if (tezJobConf.getEstimatedTotalParallelism() > 0) {
+
+ int minAMMaxHeap = 3584;
+ int minAMResourceMB = 4096;
+
+ // Rough estimation. For 5K tasks 1G Xmx and 1.5G resource.mb
+ // Increment by 512 mb for every additional 5K tasks.
+ for (int taskCount = 30000; taskCount >= 5000; taskCount-=5000) {
+ if (tezJobConf.getEstimatedTotalParallelism() > taskCount) {
+ requiredAMMaxHeap = minAMMaxHeap;
+ requiredAMResourceMB = minAMResourceMB;
+ break;
+ }
+ minAMMaxHeap = minAMMaxHeap - 512;
+ minAMResourceMB = minAMResourceMB - 512;
+ }
+
+ if (requiredAMResourceMB > -1 && configuredAMResourceMB < requiredAMResourceMB) {
+ amConf.setInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, requiredAMResourceMB);
+ log.info("Increasing "
+ + TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB + " from "
+ + configuredAMResourceMB + " to "
+ + requiredAMResourceMB
+ + " as the number of total estimated tasks is "
+ + tezJobConf.getEstimatedTotalParallelism());
+
+ if (requiredAMMaxHeap > -1 && configuredAMMaxHeap < requiredAMMaxHeap) {
+ amConf.set(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS,
+ amConf.get(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS)
+ + " -Xmx" + requiredAMMaxHeap + "M");
+ log.info("Increasing Tez AM Heap Size from "
+ + configuredAMMaxHeap + "M to "
+ + requiredAMMaxHeap
+ + "M as the number of total estimated tasks is "
+ + tezJobConf.getEstimatedTotalParallelism());
+ }
+ }
+ }
+ }
+
private static boolean validateSessionResources(SessionInfo currentSession,
Map<String, LocalResource> requestedAMResources)
throws TezException, IOException {
@@ -106,7 +163,7 @@ public class TezSessionManager {
}
static TezClient getClient(Configuration conf, Map<String, LocalResource> requestedAMResources,
- Credentials creds) throws TezException, IOException, InterruptedException {
+ Credentials creds, TezJobConfig tezJobConf) throws TezException, IOException, InterruptedException {
List<SessionInfo> sessionsToRemove = new ArrayList<SessionInfo>();
SessionInfo newSession = null;
sessionPoolLock.readLock().lock();
@@ -135,11 +192,12 @@ public class TezSessionManager {
// We cannot find available AM, create new one
// Create session outside of locks so that getClient/freeSession is not
// blocked for parallel embedded pig runs
- newSession = createSession(conf, requestedAMResources, creds);
+ newSession = createSession(conf, requestedAMResources, creds, tezJobConf);
newSession.inUse = true;
sessionPoolLock.writeLock().lock();
try {
if (shutdown == true) {
+ log.info("Shutting down Tez session " + newSession.session);
newSession.session.stop();
throw new IOException("TezSessionManager is shut down");
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java Thu Nov 27 12:49:54 2014
@@ -26,6 +26,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
@@ -35,7 +36,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.classification.InterfaceAudience;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
-import org.apache.tez.mapreduce.hadoop.MRJobConfig;
@InterfaceAudience.Private
public class MRToTezHelper {
@@ -117,6 +117,12 @@ public class MRToTezHelper {
dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_RESOURCE_CPU_VCORES, ""
+ amCores);
+ dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_VIEW_ACLS,
+ tezConf.get(MRJobConfig.JOB_ACL_VIEW_JOB, MRJobConfig.DEFAULT_JOB_ACL_VIEW_JOB));
+
+ dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_MODIFY_ACLS,
+ tezConf.get(MRJobConfig.JOB_ACL_MODIFY_JOB, MRJobConfig.DEFAULT_JOB_ACL_MODIFY_JOB));
+
dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS, ""
+ dagAMConf.getInt(MRJobConfig.MR_AM_MAX_ATTEMPTS,
MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS));
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java Thu Nov 27 12:49:54 2014
@@ -1,7 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.pig.backend.hadoop.executionengine.tez.util;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.LinkedList;
import java.util.List;
import org.apache.pig.PigException;
@@ -12,19 +30,21 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import org.apache.pig.backend.hadoop.executionengine.tez.POLocalRearrangeTez;
-import org.apache.pig.backend.hadoop.executionengine.tez.POStoreTez;
-import org.apache.pig.backend.hadoop.executionengine.tez.POValueOutputTez;
-import org.apache.pig.backend.hadoop.executionengine.tez.RoundRobinPartitioner;
-import org.apache.pig.backend.hadoop.executionengine.tez.TezEdgeDescriptor;
-import org.apache.pig.backend.hadoop.executionengine.tez.TezOperPlan;
-import org.apache.pig.backend.hadoop.executionengine.tez.TezOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezEdgeDescriptor;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POLocalRearrangeTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.runtime.RoundRobinPartitioner;
import org.apache.pig.data.DataType;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
import org.apache.tez.runtime.library.input.UnorderedKVInput;
import org.apache.tez.runtime.library.output.UnorderedKVOutput;
@@ -172,4 +192,19 @@ public class TezCompilerUtil {
edge.setIntermediateOutputValueClass(TUPLE_CLASS);
}
+ /**
+ * Returns true if there are no loads or stores in a TezOperator.
+ * To be called only after LoaderProcessor is called
+ */
+ static public boolean isIntermediateReducer(TezOperator tezOper) throws VisitorException {
+ boolean intermediateReducer = false;
+ LinkedList<POStore> stores = PlanHelper.getPhysicalOperators(tezOper.plan, POStore.class);
+ // Not map and not final reducer
+ if (stores.size() <= 0 &&
+ (tezOper.getLoaderInfo().getLoads() == null || tezOper.getLoaderInfo().getLoads().size() <= 0)) {
+ intermediateReducer = true;
+ }
+ return intermediateReducer;
+ }
+
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java Thu Nov 27 12:49:54 2014
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.pig.backend.hadoop.executionengine.util;
import java.util.List;
@@ -5,6 +22,8 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.Accumulator;
+import org.apache.pig.PigConfiguration;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.BinaryExpressionOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
@@ -27,6 +46,17 @@ import org.apache.pig.impl.PigContext;
public class AccumulatorOptimizerUtil {
private static final Log LOG = LogFactory.getLog(AccumulatorOptimizerUtil.class);
+ public static int getAccumulativeBatchSize() {
+ int batchSize = 20000;
+ if (PigMapReduce.sJobConfInternal.get() != null) {
+ String size = PigMapReduce.sJobConfInternal.get().get(PigConfiguration.PIG_ACCUMULATIVE_BATCHSIZE);
+ if (size != null) {
+ batchSize = Integer.parseInt(size);
+ }
+ }
+ return batchSize;
+ }
+
public static void addAccumulator(PhysicalPlan plan) {
// See if this is a map-reduce job
List<PhysicalOperator> pos = plan.getRoots();
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java Thu Nov 27 12:49:54 2014
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.pig.backend.hadoop.executionengine.util;
import java.util.ArrayList;
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java Thu Nov 27 12:49:54 2014
@@ -293,11 +293,25 @@ public class MapRedUtil {
}
};
+ public static long getPathLength(FileSystem fs, FileStatus status)
+ throws IOException{
+ return getPathLength(fs, status, Long.MAX_VALUE);
+ }
+
/**
- * Returns the total number of bytes for this file,
- * or if a directory all files in the directory.
+ * Returns the total number of bytes for this file, or if a directory all
+ * files in the directory.
+ *
+ * @param fs FileSystem
+ * @param status FileStatus
+ * @param max Maximum value of total length that will trigger exit. Many
+ * times we're only interested whether the total length of files is greater
+ * than X or not. In such case, we can exit the function early as soon as
+ * the max is reached.
+ * @return
+ * @throws IOException
*/
- public static long getPathLength(FileSystem fs, FileStatus status)
+ public static long getPathLength(FileSystem fs, FileStatus status, long max)
throws IOException {
if (!status.isDir()) {
return status.getLen();
@@ -306,7 +320,8 @@ public class MapRedUtil {
status.getPath(), hiddenFileFilter);
long size = 0;
for (FileStatus child : children) {
- size += getPathLength(fs, child);
+ size += getPathLength(fs, child, max);
+ if (size > max) return size;
}
return size;
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/ParallelConstantVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/ParallelConstantVisitor.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/ParallelConstantVisitor.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/ParallelConstantVisitor.java Thu Nov 27 12:49:54 2014
@@ -38,18 +38,16 @@ public class ParallelConstantVisitor ext
@Override
public void visitConstant(ConstantExpression cnst) throws VisitorException {
- if (cnst.getRequestedParallelism() == -1) {
- Object obj = cnst.getValue();
- if (obj instanceof Integer) {
- if (replaced) {
- // sample job should have only one ConstantExpression
- throw new VisitorException("Invalid reduce plan: more " +
- "than one ConstantExpression found in sampling job");
- }
- cnst.setValue(rp);
- cnst.setRequestedParallelism(rp);
- replaced = true;
+ Object obj = cnst.getValue();
+ if (obj instanceof Integer) {
+ if (replaced) {
+ // sample job should have only one ConstantExpression
+ throw new VisitorException("Invalid reduce plan: more " +
+ "than one ConstantExpression found in sampling job");
}
+ cnst.setValue(rp);
+ cnst.setRequestedParallelism(rp);
+ replaced = true;
}
}
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseBinaryConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseBinaryConverter.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseBinaryConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseBinaryConverter.java Thu Nov 27 12:49:54 2014
@@ -93,7 +93,7 @@ public class HBaseBinaryConverter implem
@Override
public Map<String, Object> bytesToMap(byte[] b, ResourceFieldSchema fieldSchema) throws IOException {
- return bytesToMap(b, fieldSchema);
+ throw new ExecException("Can't generate a Map from byte[]");
}
/**
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Thu Nov 27 12:49:54 2014
@@ -24,6 +24,8 @@ import java.lang.reflect.Method;
import java.lang.reflect.UndeclaredThrowableException;
import java.math.BigDecimal;
import java.math.BigInteger;
+import java.net.MalformedURLException;
+import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -74,6 +76,7 @@ import org.apache.hadoop.mapreduce.Outpu
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.pig.CollectableLoadFunc;
import org.apache.pig.LoadCaster;
import org.apache.pig.LoadFunc;
import org.apache.pig.LoadPushDown;
@@ -82,8 +85,10 @@ import org.apache.pig.OrderedLoadFunc;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.StoreFuncInterface;
+import org.apache.pig.StoreResources;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.backend.hadoop.hbase.HBaseTableInputFormat.HBaseTableIFBuilder;
+import org.apache.pig.builtin.FuncUtils;
import org.apache.pig.builtin.Utf8StorageConverter;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
@@ -135,7 +140,8 @@ import com.google.common.collect.Lists;
* <code>buddies</code> column family in the <code>SampleTableCopy</code> table.
*
*/
-public class HBaseStorage extends LoadFunc implements StoreFuncInterface, LoadPushDown, OrderedLoadFunc {
+public class HBaseStorage extends LoadFunc implements StoreFuncInterface, LoadPushDown, OrderedLoadFunc, StoreResources,
+ CollectableLoadFunc {
private static final Log LOG = LogFactory.getLog(HBaseStorage.class);
@@ -317,7 +323,7 @@ public class HBaseStorage extends LoadFu
if ("true".equalsIgnoreCase(value) || "".equalsIgnoreCase(value) || value == null) {//the empty string and null check is for backward compat.
noWAL_ = true;
}
- }
+ }
if (configuredOptions_.hasOption("minTimestamp")){
minTimestamp_ = Long.parseLong(configuredOptions_.getOptionValue("minTimestamp"));
@@ -719,7 +725,6 @@ public class HBaseStorage extends LoadFu
Properties udfProps = getUDFProperties();
job.getConfiguration().setBoolean("pig.noSplitCombination", true);
- initializeHBaseClassLoaderResources(job);
m_conf = initializeLocalJobConfig(job);
String delegationTokenSet = udfProps.getProperty(HBASE_TOKEN_SET);
if (delegationTokenSet == null) {
@@ -748,14 +753,23 @@ public class HBaseStorage extends LoadFu
}
}
- private void initializeHBaseClassLoaderResources(Job job) throws IOException {
+ @Override
+ public List<String> getShipFiles() {
// Depend on HBase to do the right thing when available, as of HBASE-9165
try {
Method addHBaseDependencyJars =
TableMapReduceUtil.class.getMethod("addHBaseDependencyJars", Configuration.class);
if (addHBaseDependencyJars != null) {
- addHBaseDependencyJars.invoke(null, job.getConfiguration());
- return;
+ Configuration conf = new Configuration();
+ addHBaseDependencyJars.invoke(null, conf);
+ if (conf.get("tmpjars") != null) {
+ String[] tmpjars = conf.getStrings("tmpjars");
+ List<String> shipFiles = new ArrayList<String>(tmpjars.length);
+ for (String tmpjar : tmpjars) {
+ shipFiles.add(new URL(tmpjar).getPath());
+ }
+ return shipFiles;
+ }
}
} catch (NoSuchMethodException e) {
LOG.debug("TableMapReduceUtils#addHBaseDependencyJars not available."
@@ -766,32 +780,32 @@ public class HBaseStorage extends LoadFu
} catch (InvocationTargetException e) {
LOG.debug("TableMapReduceUtils#addHBaseDependencyJars invocation"
+ " failed. Falling back to previous logic.", e);
+ } catch (MalformedURLException e) {
+ LOG.debug("TableMapReduceUtils#addHBaseDependencyJars tmpjars"
+ + " had malformed url. Falling back to previous logic.", e);
}
- // fall back to manual class handling.
- // Make sure the HBase, ZooKeeper, and Guava jars get shipped.
- TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
- org.apache.hadoop.hbase.client.HTable.class, // main hbase jar or hbase-client
- org.apache.hadoop.hbase.mapreduce.TableSplit.class, // main hbase jar or hbase-server
- com.google.common.collect.Lists.class, // guava
- org.apache.zookeeper.ZooKeeper.class); // zookeeper
+
+ List<Class> classList = new ArrayList<Class>();
+ classList.add(org.apache.hadoop.hbase.client.HTable.class); // main hbase jar or hbase-client
+ classList.add(org.apache.hadoop.hbase.mapreduce.TableSplit.class); // main hbase jar or hbase-server
+ classList.add(com.google.common.collect.Lists.class); // guava
+ classList.add(org.apache.zookeeper.ZooKeeper.class); // zookeeper
// Additional jars that are specific to v0.95.0+
- addClassToJobIfExists(job, "org.cloudera.htrace.Trace"); // htrace
- addClassToJobIfExists(job, "org.apache.hadoop.hbase.protobuf.generated.HBaseProtos"); // hbase-protocol
- addClassToJobIfExists(job, "org.apache.hadoop.hbase.TableName"); // hbase-common
- addClassToJobIfExists(job, "org.apache.hadoop.hbase.CompatibilityFactory"); // hbase-hadoop-compar
- addClassToJobIfExists(job, "org.jboss.netty.channel.ChannelFactory"); // netty
- }
-
- private void addClassToJobIfExists(Job job, String className) throws IOException {
- Class klass = null;
- try {
- klass = Class.forName(className);
- } catch (ClassNotFoundException e) {
- LOG.debug("Skipping adding jar for class: " + className);
- return;
- }
+ addClassToList("org.cloudera.htrace.Trace", classList); // htrace
+ addClassToList("org.apache.hadoop.hbase.protobuf.generated.HBaseProtos", classList); // hbase-protocol
+ addClassToList("org.apache.hadoop.hbase.TableName", classList); // hbase-common
+ addClassToList("org.apache.hadoop.hbase.CompatibilityFactory", classList); // hbase-hadoop-compar
+ addClassToList("org.jboss.netty.channel.ChannelFactory", classList); // netty
+ return FuncUtils.getShipFiles(classList);
+ }
- TableMapReduceUtil.addDependencyJars(job.getConfiguration(), klass);
+ private void addClassToList(String className, List<Class> classList) {
+ try {
+ Class klass = Class.forName(className);
+ classList.add(klass);
+ } catch (ClassNotFoundException e) {
+ LOG.debug("Skipping adding jar for class: " + className);
+ }
}
private JobConf initializeLocalJobConfig(Job job) {
@@ -940,21 +954,25 @@ public class HBaseStorage extends LoadFu
DataType.findType(t.get(i)) : fieldSchemas[i].getType()));
} else {
Map<String, Object> cfMap = (Map<String, Object>) t.get(i);
- for (String colName : cfMap.keySet()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("putNext - colName=" + colName +
- ", class: " + colName.getClass());
+ if (cfMap!=null) {
+ for (String colName : cfMap.keySet()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("putNext - colName=" + colName +
+ ", class: " + colName.getClass());
+ }
+ // TODO deal with the fact that maps can have types now. Currently we detect types at
+ // runtime in the case of storing to a cf, which is suboptimal.
+ put.add(columnInfo.getColumnFamily(), Bytes.toBytes(colName.toString()), ts,
+ objToBytes(cfMap.get(colName), DataType.findType(cfMap.get(colName))));
}
- // TODO deal with the fact that maps can have types now. Currently we detect types at
- // runtime in the case of storing to a cf, which is suboptimal.
- put.add(columnInfo.getColumnFamily(), Bytes.toBytes(colName.toString()), ts,
- objToBytes(cfMap.get(colName), DataType.findType(cfMap.get(colName))));
}
}
}
try {
- writer.write(null, put);
+ if (!put.isEmpty()) {
+ writer.write(null, put);
+ }
} catch (InterruptedException e) {
throw new IOException(e);
}
@@ -1031,7 +1049,6 @@ public class HBaseStorage extends LoadFu
schema_ = (ResourceSchema) ObjectSerializer.deserialize(serializedSchema);
}
- initializeHBaseClassLoaderResources(job);
m_conf = initializeLocalJobConfig(job);
// Not setting a udf property and getting the hbase delegation token
// only once like in setLocation as setStoreLocation gets different Job
@@ -1115,28 +1132,24 @@ public class HBaseStorage extends LoadFu
return new RequiredFieldResponse(true);
}
- @Override
- public WritableComparable<InputSplit> getSplitComparable(InputSplit split)
- throws IOException {
- return new WritableComparable<InputSplit>() {
- TableSplit tsplit = new TableSplit();
-
- @Override
- public void readFields(DataInput in) throws IOException {
- tsplit.readFields(in);
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- tsplit.write(out);
- }
+ public void ensureAllKeyInstancesInSameSplit() throws IOException {
+ /**
+ * no-op because hbase keys are unique
+ * This will also work with things like DelimitedKeyPrefixRegionSplitPolicy
+ * if you need a partial key match to be included in the split
+ */
+ LOG.debug("ensureAllKeyInstancesInSameSplit");
+ }
- @Override
- public int compareTo(InputSplit split) {
- return tsplit.compareTo((TableSplit) split);
- }
- };
+ @Override
+ public WritableComparable<TableSplit> getSplitComparable(InputSplit split) throws IOException {
+ if (split instanceof TableSplit) {
+ return new TableSplitComparable((TableSplit) split);
+ } else {
+ throw new RuntimeException("LoadFunc expected split of type TableSplit but was " + split.getClass().getName());
+ }
}
+
/**
* Class to encapsulate logic around which column names were specified in each
Modified: pig/branches/spark/src/org/apache/pig/builtin/ABS.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/ABS.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/ABS.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/ABS.java Thu Nov 27 12:49:54 2014
@@ -42,7 +42,7 @@ public class ABS extends EvalFunc<Double
* @return output returns a single numeric value, absolute value of the argument
*/
public Double exec(Tuple input) throws IOException {
- if (input == null || input.size() == 0)
+ if (input == null || input.size() == 0 || input.get(0) == null)
return null;
Double d;
Modified: pig/branches/spark/src/org/apache/pig/builtin/AvroStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/AvroStorage.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/AvroStorage.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/AvroStorage.java Thu Nov 27 12:49:54 2014
@@ -31,6 +31,7 @@ import org.apache.avro.Schema.Type;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericData;
+import org.apache.avro.mapred.AvroInputFormat;
import org.apache.avro.mapred.AvroOutputFormat;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
@@ -63,9 +64,11 @@ import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceStatistics;
import org.apache.pig.StoreFunc;
import org.apache.pig.StoreFuncInterface;
+import org.apache.pig.StoreResources;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.JarManager;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.impl.util.avro.AvroArrayReader;
@@ -82,7 +85,7 @@ import com.google.common.collect.Maps;
*
*/
public class AvroStorage extends LoadFunc
- implements StoreFuncInterface, LoadMetadata, LoadPushDown {
+ implements StoreFuncInterface, LoadMetadata, LoadPushDown, StoreResources {
/**
* Creates new instance of Pig Storage function, without specifying
@@ -593,7 +596,11 @@ public class AvroStorage extends LoadFun
} else {
rr = new AvroRecordReader(s);
}
- rr.initialize(is, tc);
+ try {
+ rr.initialize(is, tc);
+ } finally {
+ rr.close();
+ }
tc.setStatus(is.toString());
return rr;
}
@@ -674,4 +681,9 @@ public class AvroStorage extends LoadFun
}
+ @Override
+ public List<String> getShipFiles() {
+ Class[] classList = new Class[] {Schema.class, AvroInputFormat.class};
+ return FuncUtils.getShipFiles(classList);
+ }
}
Modified: pig/branches/spark/src/org/apache/pig/builtin/BigDecimalAbs.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/BigDecimalAbs.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/BigDecimalAbs.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/BigDecimalAbs.java Thu Nov 27 12:49:54 2014
@@ -27,6 +27,8 @@ import org.apache.pig.data.Tuple;
public class BigDecimalAbs extends EvalFunc<BigDecimal> {
@Override
public BigDecimal exec(Tuple input) throws IOException {
+ if (input == null || input.size() == 0 || input.get(0) == null)
+ return null;
return ((BigDecimal)input.get(0)).abs();
}
Modified: pig/branches/spark/src/org/apache/pig/builtin/BigIntegerAbs.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/BigIntegerAbs.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/BigIntegerAbs.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/BigIntegerAbs.java Thu Nov 27 12:49:54 2014
@@ -27,6 +27,8 @@ import org.apache.pig.data.Tuple;
public class BigIntegerAbs extends EvalFunc<BigInteger> {
@Override
public BigInteger exec(Tuple input) throws IOException {
+ if (input == null || input.size() == 0 || input.get(0) == null)
+ return null;
return ((BigInteger)input.get(0)).abs();
}
Modified: pig/branches/spark/src/org/apache/pig/builtin/Bloom.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/Bloom.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/Bloom.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/Bloom.java Thu Nov 27 12:49:54 2014
@@ -21,7 +21,9 @@ package org.apache.pig.builtin;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
+import java.io.File;
import java.io.FileInputStream;
+import java.io.FilenameFilter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -29,7 +31,6 @@ import java.util.List;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.bloom.BloomFilter;
import org.apache.hadoop.util.bloom.Key;
-
import org.apache.pig.FilterFunc;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
@@ -94,9 +95,22 @@ public class Bloom extends FilterFunc {
private void init() throws IOException {
filter = new BloomFilter();
- String dcFile = "./" + getFilenameFromPath(bloomFile) +
- "/part-r-00000";
- filter.readFields(new DataInputStream(new FileInputStream(dcFile)));
+ String dir = "./" + getFilenameFromPath(bloomFile);
+ String[] partFiles = new File(dir)
+ .list(new FilenameFilter() {
+ @Override
+ public boolean accept(File current, String name) {
+ return name.startsWith("part");
+ }
+ });
+
+ String dcFile = dir + "/" + partFiles[0];
+ DataInputStream dis = new DataInputStream(new FileInputStream(dcFile));
+ try {
+ filter.readFields(dis);
+ } finally {
+ dis.close();
+ }
}
/**
Modified: pig/branches/spark/src/org/apache/pig/builtin/BuildBloom.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/BuildBloom.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/BuildBloom.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/BuildBloom.java Thu Nov 27 12:49:54 2014
@@ -39,15 +39,15 @@ import org.apache.pig.impl.logicalLayer.
* define bb BuildBloom('jenkins', '100', '0.1');
* A = load 'foo' as (x, y);
* B = group A all;
- * C = foreach B generate BuildBloom(A.x);
+ * C = foreach B generate bb(A.x);
* store C into 'mybloom';
* The bloom filter can be on multiple keys by passing more than one field
* (or the entire bag) to BuildBloom.
* The resulting file can then be used in a Bloom filter as:
- * define bloom Bloom(mybloom);
+ * define bloom Bloom('mybloom');
* A = load 'foo' as (x, y);
* B = load 'bar' as (z);
- * C = filter B by Bloom(z);
+ * C = filter B by bloom(z);
* D = join C by z, A by x;
* It uses {@link org.apache.hadoop.util.bloom.BloomFilter}.
*/
Modified: pig/branches/spark/src/org/apache/pig/builtin/Distinct.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/Distinct.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/Distinct.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/Distinct.java Thu Nov 27 12:49:54 2014
@@ -22,6 +22,9 @@ import java.io.IOException;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
+import org.apache.pig.JVMReuseManager;
+import org.apache.pig.PigConfiguration;
+import org.apache.pig.StaticDataCleanup;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.data.BagFactory;
@@ -38,35 +41,35 @@ import org.apache.pig.data.TupleFactory;
*/
public class Distinct extends EvalFunc<DataBag> implements Algebraic {
- private static BagFactory bagFactory = BagFactory.getInstance();
private static TupleFactory tupleFactory = TupleFactory.getInstance();
- /* (non-Javadoc)
- * @see org.apache.pig.EvalFunc#exec(org.apache.pig.data.Tuple)
- */
+ private static boolean initialized = false;
+ private static boolean useDefaultBag = false;
+
+ static {
+ JVMReuseManager.getInstance().registerForStaticDataCleanup(Distinct.class);
+ }
+
+ @StaticDataCleanup
+ public static void staticDataCleanup() {
+ initialized = false;
+ useDefaultBag = false;
+ }
+
@Override
public DataBag exec(Tuple input) throws IOException {
return getDistinct(input);
}
- /* (non-Javadoc)
- * @see org.apache.pig.Algebraic#getFinal()
- */
@Override
public String getFinal() {
return Final.class.getName();
}
- /* (non-Javadoc)
- * @see org.apache.pig.Algebraic#getInitial()
- */
@Override
public String getInitial() {
return Initial.class.getName();
}
- /* (non-Javadoc)
- * @see org.apache.pig.Algebraic#getIntermed()
- */
@Override
public String getIntermed() {
return Intermediate.class.getName();
@@ -74,13 +77,10 @@ public class Distinct extends EvalFunc<
static public class Initial extends EvalFunc<Tuple> {
- /* (non-Javadoc)
- * @see org.apache.pig.EvalFunc#exec(org.apache.pig.data.Tuple)
- */
@Override
public Tuple exec(Tuple input) throws IOException {
// the input has a single field which is a tuple
- // representing the data we want to distinct.
+ // representing the data we want to distinct.
// unwrap, put in a bag and send down
try {
Tuple single = (Tuple)input.get(0);
@@ -94,9 +94,6 @@ public class Distinct extends EvalFunc<
static public class Intermediate extends EvalFunc<Tuple> {
- /* (non-Javadoc)
- * @see org.apache.pig.EvalFunc#exec(org.apache.pig.data.Tuple)
- */
@Override
public Tuple exec(Tuple input) throws IOException {
return tupleFactory.newTuple(getDistinctFromNestedBags(input, this));
@@ -105,30 +102,27 @@ public class Distinct extends EvalFunc<
static public class Final extends EvalFunc<DataBag> {
- /* (non-Javadoc)
- * @see org.apache.pig.EvalFunc#exec(org.apache.pig.data.Tuple)
- */
@Override
public DataBag exec(Tuple input) throws IOException {
return getDistinctFromNestedBags(input, this);
}
}
-
- static private DataBag createDataBag() {
- // by default, we create InternalSortedBag, unless user configures
- // explicitly to use old bag
- String bagType = null;
- if (PigMapReduce.sJobConfInternal.get() != null) {
- bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.distinct.type");
- }
-
- if (bagType != null && bagType.equalsIgnoreCase("default")) {
- return BagFactory.getInstance().newDistinctBag();
- } else {
- return new InternalDistinctBag(3);
- }
+
+ private static DataBag createDataBag() {
+ if (!initialized) {
+ initialized = true;
+ if (PigMapReduce.sJobConfInternal.get() != null) {
+ String bagType = PigMapReduce.sJobConfInternal.get().get(PigConfiguration.PIG_CACHEDBAG_DISTINCT_TYPE);
+ if (bagType != null && bagType.equalsIgnoreCase("default")) {
+ useDefaultBag = true;
+ }
+ }
+ }
+ // by default, we create InternalDistinctBag, unless user configures
+ // explicitly to use old bag
+ return useDefaultBag ? BagFactory.getInstance().newDistinctBag() : new InternalDistinctBag(3);
}
-
+
static private DataBag getDistinctFromNestedBags(Tuple input, EvalFunc evalFunc) throws IOException {
DataBag result = createDataBag();
long progressCounter = 0;
@@ -144,7 +138,7 @@ public class Distinct extends EvalFunc<
for (Tuple t : (DataBag)tuple.get(0)) {
result.add(t);
++progressCounter;
- if((progressCounter % 1000) == 0){
+ if((progressCounter % 1000) == 0){
evalFunc.progress();
}
}
@@ -154,7 +148,7 @@ public class Distinct extends EvalFunc<
}
return result;
}
-
+
protected DataBag getDistinct(Tuple input) throws IOException {
try {
DataBag inputBg = (DataBag)input.get(0);
Modified: pig/branches/spark/src/org/apache/pig/builtin/DoubleRound.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/DoubleRound.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/DoubleRound.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/DoubleRound.java Thu Nov 27 12:49:54 2014
@@ -37,7 +37,7 @@ public class DoubleRound extends EvalFun
*/
@Override
public Long exec(Tuple input) throws IOException {
- if (input == null || input.size() == 0)
+ if (input == null || input.size() == 0 || input.get(0) == null)
return null;
try{
Modified: pig/branches/spark/src/org/apache/pig/builtin/FloatRound.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/FloatRound.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/FloatRound.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/FloatRound.java Thu Nov 27 12:49:54 2014
@@ -39,7 +39,7 @@ public class FloatRound extends EvalFunc
*/
@Override
public Integer exec(Tuple input) throws IOException {
- if (input == null || input.size() == 0)
+ if (input == null || input.size() == 0 || input.get(0) == null)
return null;
try{
Modified: pig/branches/spark/src/org/apache/pig/builtin/JsonLoader.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/JsonLoader.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/JsonLoader.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/JsonLoader.java Thu Nov 27 12:49:54 2014
@@ -19,28 +19,26 @@ package org.apache.pig.builtin;
import java.io.ByteArrayInputStream;
import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.math.BigDecimal;
-import java.math.BigInteger;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
import org.joda.time.format.ISODateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
-
import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.JsonToken;
-
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-
import org.apache.pig.Expression;
import org.apache.pig.LoadCaster;
import org.apache.pig.LoadFunc;
@@ -56,6 +54,7 @@ import org.apache.pig.data.DataByteArray
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.util.JarManager;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.parser.ParserException;
@@ -153,23 +152,31 @@ public class JsonLoader extends LoadFunc
// isn't what we expect we return a tuple with null fields rather than
// throwing an exception. That way a few mangled lines don't fail the
// job.
- if (p.nextToken() != JsonToken.START_OBJECT) {
- warn("Bad record, could not find start of record " +
- val.toString(), PigWarning.UDF_WARNING_1);
- return t;
- }
-
- // Read each field in the record
- for (int i = 0; i < fields.length; i++) {
- t.set(i, readField(p, fields[i], i));
- }
-
- if (p.nextToken() != JsonToken.END_OBJECT) {
- warn("Bad record, could not find end of record " +
- val.toString(), PigWarning.UDF_WARNING_1);
- return t;
+
+ try {
+ if (p.nextToken() != JsonToken.START_OBJECT) {
+ warn("Bad record, could not find start of record " +
+ val.toString(), PigWarning.UDF_WARNING_1);
+ return t;
+ }
+
+ // Read each field in the record
+ for (int i = 0; i < fields.length; i++) {
+ t.set(i, readField(p, fields[i], i));
+ }
+
+ if (p.nextToken() != JsonToken.END_OBJECT) {
+ warn("Bad record, could not find end of record " +
+ val.toString(), PigWarning.UDF_WARNING_1);
+ return t;
+ }
+
+ } catch (JsonParseException jpe) {
+ warn("Bad record, returning null for " + val, PigWarning.UDF_WARNING_1);
+ } finally {
+ p.close();
}
- p.close();
+
return t;
}
@@ -242,7 +249,7 @@ public class JsonLoader extends LoadFunc
case DataType.BIGDECIMAL:
tok = p.nextToken();
if (tok == JsonToken.VALUE_NULL) return null;
- return p.getDecimalValue();
+ return new BigDecimal(p.getText());
case DataType.MAP:
// Should be a start of the map object
@@ -372,4 +379,11 @@ public class JsonLoader extends LoadFunc
throws IOException {
// We don't have partitions
}
+
+ @Override
+ public List<String> getShipFiles() {
+ List<String> cacheFiles = new ArrayList<String>();
+ Class[] classList = new Class[] {JsonFactory.class};
+ return FuncUtils.getShipFiles(classList);
+ }
}
Modified: pig/branches/spark/src/org/apache/pig/builtin/JsonStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/JsonStorage.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/JsonStorage.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/JsonStorage.java Thu Nov 27 12:49:54 2014
@@ -19,17 +19,15 @@ package org.apache.pig.builtin;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.math.BigDecimal;
import java.math.BigInteger;
-import org.joda.time.DateTime;
-
import org.codehaus.jackson.JsonEncoding;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonGenerator;
-
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
@@ -38,25 +36,18 @@ import org.apache.hadoop.mapreduce.Outpu
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.ResourceStatistics;
import org.apache.pig.StoreMetadata;
import org.apache.pig.StoreFunc;
+import org.apache.pig.StoreResources;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.DataBag;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.impl.util.Utils;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.TreeMap;
-
/**
* A JSON Pig store function. Each Pig tuple is stored on one line (as one
* value for TextOutputFormat) so that it can be read easily using
@@ -66,7 +57,7 @@ import java.util.TreeMap;
* with mapping between JSON and Pig types. The schema file share the same format
* as the one we use in PigStorage.
*/
-public class JsonStorage extends StoreFunc implements StoreMetadata {
+public class JsonStorage extends StoreFunc implements StoreMetadata, StoreResources {
protected RecordWriter writer = null;
protected ResourceSchema schema = null;
@@ -318,4 +309,14 @@ public class JsonStorage extends StoreFu
return s;
}
+ @Override
+ public List<String> getShipFiles() {
+ Class[] classList = new Class[] {JsonFactory.class};
+ return FuncUtils.getShipFiles(classList);
+ }
+
+ @Override
+ public List<String> getCacheFiles() {
+ return null;
+ }
}
Modified: pig/branches/spark/src/org/apache/pig/builtin/OrcStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/OrcStorage.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/OrcStorage.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/OrcStorage.java Thu Nov 27 12:49:54 2014
@@ -20,11 +20,11 @@ package org.apache.pig.builtin;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
+import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
-import java.util.concurrent.TimeUnit;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
@@ -37,7 +37,6 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.io.orc.CompressionKind;
import org.apache.hadoop.hive.ql.io.orc.OrcFile;
@@ -50,12 +49,13 @@ import org.apache.hadoop.hive.ql.io.orc.
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.Builder;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
-import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.hive.shims.HadoopShimsSecure;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
@@ -82,6 +82,7 @@ import org.apache.pig.Expression.BinaryE
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.ResourceStatistics;
import org.apache.pig.StoreFuncInterface;
+import org.apache.pig.StoreResources;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.DataType;
@@ -93,6 +94,7 @@ import org.apache.pig.impl.util.Utils;
import org.apache.pig.impl.util.orc.OrcUtils;
import org.joda.time.DateTime;
+import com.esotericsoftware.kryo.io.Input;
import com.google.common.annotations.VisibleForTesting;
/**
@@ -111,7 +113,7 @@ import com.google.common.annotations.Vis
* <li><code>-v, --version</code> Sets the version of the file that will be written
* </ul>
**/
-public class OrcStorage extends LoadFunc implements StoreFuncInterface, LoadMetadata, LoadPushDown, LoadPredicatePushdown {
+public class OrcStorage extends LoadFunc implements StoreFuncInterface, LoadMetadata, LoadPushDown, LoadPredicatePushdown, StoreResources {
//TODO Make OrcInputFormat.SARG_PUSHDOWN visible
private static final String SARG_PUSHDOWN = "sarg.pushdown";
@@ -384,6 +386,26 @@ public class OrcStorage extends LoadFunc
}
}
+ @Override
+ public List<String> getShipFiles() {
+ List<String> cacheFiles = new ArrayList<String>();
+ String hadoopVersion = "20S";
+ if (Utils.isHadoop23() || Utils.isHadoop2()) {
+ hadoopVersion = "23";
+ }
+ Class hadoopVersionShimsClass;
+ try {
+ hadoopVersionShimsClass = Class.forName("org.apache.hadoop.hive.shims.Hadoop" +
+ hadoopVersion + "Shims");
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException("Cannot find Hadoop" + hadoopVersion + "ShimsClass in classpath");
+ }
+ Class[] classList = new Class[] {OrcFile.class, HiveConf.class, AbstractSerDe.class,
+ org.apache.hadoop.hive.shims.HadoopShims.class, HadoopShimsSecure.class, hadoopVersionShimsClass,
+ Input.class};
+ return FuncUtils.getShipFiles(classList);
+ }
+
private static Path getFirstFile(String location, FileSystem fs) throws IOException {
String[] locations = getPathStrings(location);
Path[] paths = new Path[locations.length];
@@ -498,8 +520,6 @@ public class OrcStorage extends LoadFunc
for (ResourceFieldSchema field : schema.getFields()) {
switch(field.getType()) {
case DataType.BOOLEAN:
- // TODO: ORC does not seem to support it
- break;
case DataType.INTEGER:
case DataType.LONG:
case DataType.FLOAT:
@@ -671,14 +691,12 @@ public class OrcStorage extends LoadFunc
}
private Object getSearchArgObjValue(Object value) {
- // TODO Test BigInteger, BigInteger and DateTime
if (value instanceof BigInteger) {
- return HiveDecimal.create(((BigInteger)value));
+ return new BigDecimal((BigInteger)value);
} else if (value instanceof BigDecimal) {
- return HiveDecimal.create(((BigDecimal)value), false);
+ return value;
} else if (value instanceof DateTime) {
- //TODO is this right based on what DateTimeWritable.dateToDays() does? What about pig.datetime.default.tz?
- return new DateWritable((int)(((DateTime)value).getMillis() / TimeUnit.DAYS.toMillis(1)));
+ return new Timestamp(((DateTime)value).getMillis());
} else {
return value;
}
Modified: pig/branches/spark/src/org/apache/pig/builtin/ROUND.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/ROUND.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/ROUND.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/ROUND.java Thu Nov 27 12:49:54 2014
@@ -44,7 +44,7 @@ public class ROUND extends EvalFunc<Long
*/
@Override
public Long exec(Tuple input) throws IOException {
- if (input == null || input.size() == 0)
+ if (input == null || input.size() == 0 || input.get(0) == null)
return null;
try{
Modified: pig/branches/spark/src/org/apache/pig/builtin/TextLoader.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/TextLoader.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/TextLoader.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/TextLoader.java Thu Nov 27 12:49:54 2014
@@ -148,7 +148,9 @@ public class TextLoader extends LoadFunc
@Override
public Map<String, Object> bytesToMap(byte[] b, ResourceFieldSchema schema) throws IOException {
- return bytesToMap(b, schema);
+ int errCode = 2109;
+ String msg = "TextLoader does not support conversion to Map.";
+ throw new ExecException(msg, errCode, PigException.BUG);
}
/**