You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2012/11/01 02:03:21 UTC
svn commit: r1404440 - in /pig/trunk: CHANGES.txt
test/e2e/harness/TestDriver.pm test/e2e/harness/test_harness.pl
test/e2e/pig/build.xml test/e2e/pig/conf/local.conf
test/e2e/pig/deployers/ExistingClusterDeployer.pm
test/e2e/pig/drivers/TestDriverPig.pm
Author: rohini
Date: Thu Nov 1 01:03:20 2012
New Revision: 1404440
URL: http://svn.apache.org/viewvc?rev=1404440&view=rev
Log:
PIG-2898: Parallel execution of e2e tests (iveselovsky via rohini)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/test/e2e/harness/TestDriver.pm
pig/trunk/test/e2e/harness/test_harness.pl
pig/trunk/test/e2e/pig/build.xml
pig/trunk/test/e2e/pig/conf/local.conf
pig/trunk/test/e2e/pig/deployers/ExistingClusterDeployer.pm
pig/trunk/test/e2e/pig/drivers/TestDriverPig.pm
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1404440&r1=1404439&r2=1404440&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Nov 1 01:03:20 2012
@@ -58,6 +58,8 @@ PIG-1891 Enable StoreFunc to make intell
IMPROVEMENTS
+PIG-2898: Parallel execution of e2e tests (iveselovsky via rohini)
+
PIG-2913: org.apache.pig.test.TestPigServerWithMacros fails sometimes because it picks up previous minicluster configuration file (cheolsoo via julien)
PIG-2976: Reduce HBaseStorage logging (billgraham)
Modified: pig/trunk/test/e2e/harness/TestDriver.pm
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/harness/TestDriver.pm?rev=1404440&r1=1404439&r2=1404440&view=diff
==============================================================================
--- pig/trunk/test/e2e/harness/TestDriver.pm (original)
+++ pig/trunk/test/e2e/harness/TestDriver.pm Thu Nov 1 01:03:20 2012
@@ -24,6 +24,10 @@ package TestDriver;
use TestDriverFactory;
use TestReport;
use File::Path;
+use Parallel::ForkManager;
+use FileHandle;
+use File::Copy;
+use File::Basename;
my $passedStr = 'passed';
my $failedStr = 'failed';
@@ -31,6 +35,27 @@ my $abortedStr = 'aborted';
my $skippedStr = 'skipped';
my $dependStr = 'failed_dependency';
+# A constant to be used as a key in hash:
+my $keyGlobalSetupConditionalDone = 'globalSetupConditionaldone';
+
+################################################################################
+# Sub: appendToLength
+# static mathod to make a string not shorter than N characters in length:
+# appends spaces unlit the desired length achieved.
+#
+# Paramaters:
+# str - the string
+# len - the min deired length
+#
+# Returns: the modified string
+#
+sub appendToLength($$) {
+ my ($str, $len) = @_;
+ while (length($str) < $len) {
+ $str .= " ";
+ }
+ return $str;
+}
##############################################################################
# Sub: printResults
@@ -47,7 +72,7 @@ my $dependStr = 'failed_dependency';
#
sub printResults
{
- my ($testStatuses, $log, $prefix) = @_;
+ my ($testStatuses, $log, $prefix, $confFile, $groupName) = @_;
my ($pass, $fail, $abort, $depend, $skipped) = (0, 0, 0, 0, 0);
@@ -59,11 +84,86 @@ sub printResults
($testStatuses->{$_} eq $skippedStr) && $skipped++;
}
- my $msg = "$prefix, PASSED: $pass FAILED: $fail SKIPPED: $skipped ABORTED: $abort "
- . "FAILED DEPENDENCY: $depend";
+ my $context = "";
+ if (defined($confFile) && (length(confFile) > 0)) {
+ $context = " [$confFile-$groupName]";
+ }
+ # XXX why comma is added there?
+ my $msg = appendToLength($prefix . ",", 18)
+ . " PASSED: " . appendToLength($pass,4)
+ . " FAILED: " . appendToLength($fail,4)
+ . " SKIPPED: " . appendToLength($skipped,4)
+ . " ABORTED: " . appendToLength($abort,4)
+ . " FAILED DEPENDENCY: " . appendToLength($depend,4)
+ . $context;
print $log "$msg\n";
- print "$msg\n";
-
+ print "$msg\n";
+}
+
+##############################################################################
+# Puts all the k-v pairs from sourceHash to the targetHash.
+# Returns: void
+# parameters:
+# 1: targetHash,
+# 2: sourceHash.
+sub putAll($$)
+{
+ my ($targetHash, $sourceHash) = @_;
+ while (my ($key, $value) = each(%$sourceHash)) {
+ $targetHash->{ $key } = $value;
+ }
+}
+
+##############################################################################
+# appends one file to another.
+# parameters:
+# 1: sourceFileName,
+# 2: targetFileName.
+# Returns: void
+sub appendFile($$) {
+ my ($source, $target) = @_;
+ dbg("Appending file [" . Cwd::realpath($source) . "] >> [" . Cwd::realpath($target) . "]\n");
+ $sourceHandle = FileHandle->new("<$source");
+ if (! defined $sourceHandle) {
+ die "Cannot open source file [$source].";
+ }
+ $targetHandle = FileHandle->new(">>$target");
+ if (defined $targetHandle) {
+ copy($sourceHandle, $targetHandle);
+ $targetHandle->close();
+ $sourceHandle->close();
+ } else {
+ die "cannot open target file [$target].";
+ }
+}
+
+##############################################################################
+# Diagnostic sub to print a hash contents
+# Paramaters:
+# 1: the hash reference;
+# Returns: void
+sub dbgDumpHash($;$)
+{
+ if ($ENV{'E2E_DEBUG'} eq 'true') {
+ my ($myhash, $msg) = @_;
+ print "Dump of hash $msg:\n";
+ while (my ($key, $value) = each(%$myhash)) {
+ print " [$key] = [$value]\n";
+ }
+ }
+}
+
+##############################################################################
+# Diagnostic sub to print a debug output.
+# This is useful when debugging the harness perl scripts.
+# Paramaters:
+# 1*: object(s) to be printed, typically one string;
+# Returns: void
+sub dbg(@)
+{
+ if ($ENV{'E2E_DEBUG'} eq 'true') {
+ print @_;
+ }
}
##############################################################################
@@ -97,7 +197,6 @@ sub printGroupResultsXml
my $total= $pass + $fail + $abort;
$report->totals( $groupName, $total, $fail, $abort, $totalDuration );
-
}
##############################################################################
@@ -124,10 +223,13 @@ sub new
##############################################################################
# Sub: globalSetup
-# Set up before any tests are run. This gives each individual driver a chance to do
+# Set up before any tests from the group are run. This gives each individual driver a chance to do
# setup. This function will only be called once, before all the tests are
# run. A driver need not implement it. It is a virtual function.
#
+# This method invoked unconditionally (always), even if there are no test to run in the group. See
+# also #globalSetupConditional() description.
+#
# Paramaters:
# globalHash - Top level hash from config file (does not have any group
# or test information in it).
@@ -140,11 +242,32 @@ sub globalSetup
{
}
+##############################################################################
+# Sub: globalSetupConditional
+# Set up before any tests from the test config file (in sequential mode) or test group (in parallel mode) are run.
+# Executes after #globalSetup(). Executes *only* if there is at least one test to run. Introduced for performance
+# optimization in parallel execution mode.
+# It is a virtual function. Subclasses may override it.
+#
+# Paramaters:
+# globalHash - Top level hash from config file (does not have any group
+# or test information in it).
+# log - log file handle
+#
+# Returns:
+# None
+#
+sub globalSetupConditional
+{
+}
+
###############################################################################
# Sub: globalCleanup
# Clean up after all tests have run. This gives each individual driver a chance to do
# cleanup. This function will only be called once, after all the tests are
# run. A driver need not implement it. It is a virtual function.
+# This method invoked unconditionally, even if no test in the group was executed.
+# See #globalCleanupConditional() method description.
#
# Paramaters:
# globalHash - Top level hash from config file (does not have any group
@@ -158,6 +281,22 @@ sub globalCleanup
}
###############################################################################
+# Sub: globalCleanupConditional
+# Clean up after all tests have run, before #globalCleanup(). Invoked iff #globalSetupConditional()
+# was previously invoked gor this config file (sequential mode) or test group (parallel mode).
+# It is a virtual function. Subclasses may override it.
+#
+# Paramaters:
+# globalHash - Top level hash from config file (does not have any group
+# or test information in it).
+# log - log file handle
+#
+# Returns:
+# None
+sub globalCleanupConditional() {
+}
+
+###############################################################################
# Sub: runTest
# Run a test. This is a pure virtual function.
#
@@ -280,12 +419,8 @@ sub run
my ($self, $testsToRun, $testsToMatch, $cfg, $log, $dbh, $testStatuses,
$confFile, $startat, $logname ) = @_;
- my $subName = (caller(0))[3];
- my $msg="";
- my $duration=0;
- my $totalDuration=0;
- my $groupDuration=0;
- my $sawstart = !(defined $startat);
+ my $subName = (caller(0))[3];
+ my $msg="";
# Rather than make each driver handle our multi-level cfg, we'll flatten
# the hashes into one for it.
my %globalHash;
@@ -298,20 +433,129 @@ sub run
$globalHash{$_} = $cfg->{$_};
}
- $globalHash{$_} = $cfg->{$_};
- # Do the global setup
- $self->globalSetup(\%globalHash, $log);
+ my $report=0;
+ my $properties= new Properties(0, $globalHash{'propertiesFile'});
- my $report=0;
- my $properties= new Properties(0, $globalHash{'propertiesFile'});
+ my $fileForkFactor = int($ENV{'FORK_FACTOR_FILE'});
+ my $groupForkFactor = int($ENV{'FORK_FACTOR_GROUP'});
+ # NB: this is to distinguish the sequential mode from parallel one:
+ my $productForkFactor = $fileForkFactor * $groupForkFactor;
+ my $pm;
+ if ($groupForkFactor > 1) {
+ print $log "Group fork factor: $groupForkFactor\n";
+ # Create the fork manager:
+ $pm = new Parallel::ForkManager($groupForkFactor);
+ # this is a callback method that will run in the main process on each job subprocess completion:
+ $pm -> run_on_finish (
+ sub {
+ my ($pid, $exit_code, $identification, $exit_signal, $core_dump, $data_structure_reference) = @_;
+ # see what the child sent us, if anything
+ if (defined($data_structure_reference)) {
+ dbg("Group subprocess [$identification] finished, pid=${pid}, sent back: [$data_structure_reference].\n");
+ dbgDumpHash($data_structure_reference, "The hash passed in in the run_on_finish callback:");
+ putAll($testStatuses, $data_structure_reference);
+ dbgDumpHash($testStatuses, "The statuses after merge in the run_on_finish callback:");
+ } else {
+ print "ERROR: Group subprocess [$identification] did not send back anything. Exit code = $exit_code\n";
+ }
+ my $subLogAgain = "$logname-$identification";
+ appendFile($subLogAgain,$logname);
+ }
+ );
+ } else {
+ # Do the global setup:
+ $self->globalSetup(\%globalHash, $log);
+ }
- my %groupExecuted;
+ my $localStartAt = $startat;
foreach my $group (@{$cfg->{'groups'}}) {
-
- print $log "INFO $subName at ".__LINE__.": Running TEST GROUP(".$group->{'name'}.")\n";
+ my $groupName = $group->{'name'};
+
+ my $subLog;
+ my $subLogName;
+ if ($groupForkFactor > 1) {
+ # use group name as the Job id:
+ my $jobId = $groupName;
+
+ $subLogName = "$logname-$jobId";
+ open $subLog, ">$subLogName" or die "FATAL ERROR $0 at ".__LINE__." : Can't open $subLogName, $!\n";
+
+ dbg("**** Logging to [$subLogName].\n");
+ # PARALLEL SECTION START: ===============================================================================
+ $pm->start($jobId) and next;
+ dbg("Started test group job \"$jobId\"\n");
+
+ dbg("Doing setup for test group [$groupName]...\n");
+ # Set the group-specific ID:
+ # NB: note that '$globalHash' here is an object cloned for this subprocess.
+ # So, there is no concurrency issue in using '$globalHash' there:
+ $globalHash{'job-id'} = $globalHash{'job-id'} . "-" . $jobId;
+ # Do the global setup which is specific for *this group*:
+ $self->globalSetup(\%globalHash, $subLog);
+ } else {
+ $subLog = $log;
+ $subLogName = $logname;
+ }
+
+ # Run the group of tests.
+ # NB: the processing of $localStartAt parameter happens only if the groupForkFactor < 1.
+ my $sawStart = $self -> runTestGroup($groupName, $subLog, $confFile, \%globalHash, $group, $runAll, $testsToRun, $testsToMatch, $localStartAt, $testStatuses, $productForkFactor);
+ if ((defined $localStartAt) && $sawStart) {
+ undef $localStartAt;
+ }
+
+ if ($groupForkFactor > 1) {
+ # do the clanups that are specific for *this group*.
+ dbg("Doing cleanup for test group [$groupName]...\n");
+ # NB: invoke it in such way to emphasize the fact that this method is not virtual:
+ globalCleanupConditionalIf($self, \%globalHash, $subLog);
+ $self->globalCleanup(\%globalHash, $subLog);
+
+ dbg("Finishing test group [$groupName].\n");
+ dbgDumpHash($testStatuses, "The satatuses hash at the fork section end");
+ # NB: send the "testStatuses" hash object reference (which is local to this subprocess) to the parent process:
+ $subLog -> close();
+
+ # TODO: may also consider the #runTestGroup block exit status and use it there.
+ $pm -> finish(0, $testStatuses);
+ # PARALLEL SECTION END. ===============================================================================
+ }
+ } # foreach $group
+
+ if ($groupForkFactor > 1) {
+ $pm->wait_all_children;
+ } else {
+ # Do the global cleanups:
+ # NB: invoke it in such way to emphasize the fact that this method is not virtual:
+ globalCleanupConditionalIf($self, \%globalHash, $log);
+ $self->globalCleanup(\%globalHash, $log);
+ }
+}
+
+# Servce method to conditionally perform the virtual #globalCleanupConditional().
+# NB: This sub should be "final" in Java terms,
+# subclasses should not override it.
+sub globalCleanupConditionalIf() {
+ my ($self, $globalHash, $log) = @_;
+ if (defined($globalHash->{$keyGlobalSetupConditionalDone})) {
+ $self -> globalCleanupConditional($globalHash, $log);
+ }
+}
+
+################################################################################
+# Separated sub to run a test group.
+# Parameters: (same named values from #run(...) sub with the same meaning).
+# Returns: 'true' if the test defined by '$startat' was found, and 'false' otherwise.
+# (If the '$startat' is null, always returns true.)
+sub runTestGroup() {
+ my ($self, $groupName, $subLog, $confFile, $globalHash, $group, $runAll, $testsToRun, $testsToMatch, $startat, $testStatuses, $productForkFactor) = @_;
+
+ my $subName = (caller(0))[3];
+ print $subLog "INFO $subName at ".__LINE__.": Running TEST GROUP(".$groupName.")\n";
+ my $sawstart = !(defined $startat);
- my %groupHash = %globalHash;
- $groupHash{'group'} = $group->{'name'};
+ my %groupHash = %$globalHash;
+ $groupHash{'group'} = $groupName;
# Read the group keys
foreach (keys(%$group)) {
@@ -319,8 +563,10 @@ sub run
$groupHash{$_} = $group->{$_};
}
+ my $groupDuration=0;
+ my $duration=0;
- # Run each test
+ # Run each test in the group:
foreach my $test (@{$group->{'tests'}}) {
# Check if we're supposed to run this one or not.
if (!$runAll) {
@@ -361,17 +607,19 @@ sub run
}
}
}
-
next unless $foundIt;
}
# This is a test, so run it.
my %testHash = %groupHash;
+ my $tmpTestHash = \%testHash;
+
foreach (keys(%$test)) {
$testHash{$_} = $test->{$_};
}
my $testName = $testHash{'group'} . "_" . $testHash{'num'};
+ dbg("################### Executing test [$testName]...\n");
# if ( $groupExecuted{ $group->{'name'} }== 0 ){
# $groupExecuted{ $group->{'name'} }=1;
@@ -386,13 +634,13 @@ sub run
# Check that ignore isn't set for this file, group, or test
if (defined $testHash{'ignore'}) {
- print $log "Ignoring test $testName, ignore message: " .
+ print $subLog "Ignoring test $testName, ignore message: " .
$testHash{'ignore'} . "\n";
next;
}
# Have we not reached the starting point yet?
- if (!$sawstart) {
+ if (!$sawstart) {
if ($testName eq $startat) {
$sawstart = 1;
} else {
@@ -407,7 +655,7 @@ sub run
foreach (keys(%testHash)) {
if (/^depends_on/ && defined($testStatuses->{$testHash{$_}}) &&
$testStatuses->{$testHash{$_}} ne $passedStr) {
- print $log "Skipping test $testName, it depended on " .
+ print $subLog "Skipping test $testName, it depended on " .
"$testHash{$_} which returned a status of " .
"$testStatuses->{$testHash{$_}}\n";
$testStatuses->{$testName} = $dependStr;
@@ -416,14 +664,28 @@ sub run
}
}
if ($skipThisOne) {
- printResults($testStatuses, $log, "Results so far");
- next;
+ if ($productForkFactor > 1) {
+ printResults($testStatuses, $subLog, "Results so far", basename($confFile), $groupName);
+ } else {
+ printResults($testStatuses, $subLog, "Results so far");
+ }
+ next;
}
- print $log "\n******************************************************\n";
- print $log "\nTEST: $confFile::$testName\n";
- print $log "******************************************************\n";
- print $log "Beginning test $testName at " . time . "\n";
+ print $subLog "\n******************************************************\n";
+ print $subLog "\nTEST: $confFile::$testName\n";
+ print $subLog "******************************************************\n";
+ print $subLog "Beginning test $testName at " . time . "\n";
+
+ # At this point we're going to run the test for sure.
+ # So, do the preparation for that, if not yet done:
+ if (!defined($globalHash->{$keyGlobalSetupConditionalDone})) {
+ $self -> globalSetupConditional($globalHash, $subLog);
+ # this preparation should be done only *once* per each $globalHash instance,
+ # so, set special flag to prevent #globalSetupConditional from being executed again:
+ $globalHash->{$keyGlobalSetupConditionalDone} = 'true';
+ }
+
my %dbinfo = (
'testrun_id' => $testHash{'trid'},
'test_type' => $testHash{'driver'},
@@ -436,11 +698,11 @@ sub run
my $endTime = 0;
my ($testResult, $benchmarkResult);
eval {
- $testResult = $self->runTest(\%testHash, $log);
+ $testResult = $self->runTest(\%testHash, $subLog);
$endTime = time;
- $benchmarkResult = $self->generateBenchmark(\%testHash, $log);
+ $benchmarkResult = $self->generateBenchmark(\%testHash, $subLog);
my $result =
- $self->compare($testResult, $benchmarkResult, $log, \%testHash);
+ $self->compare($testResult, $benchmarkResult, $subLog, \%testHash);
$msg = "INFO: $subName() at ".__LINE__.":Test $testName";
if ($result eq $self->{'wrong_execution_mode'}) {
@@ -457,24 +719,21 @@ sub run
}
$msg= "$msg at " . time . "\n";
#print $msg;
- print $log $msg;
+ print $subLog $msg;
$duration = $endTime - $beginTime;
$dbinfo{'duration'} = $duration;
$self->recordResults($result, $testResult
- , $benchmarkResult, \%dbinfo, $log);
-
+ , $benchmarkResult, \%dbinfo, $subLog);
};
-
if ($@) {
$msg= "ERROR $subName at : ".__LINE__." Failed to run test $testName <$@>\n";
#print $msg;
- print $log $msg;
+ print $subLog $msg;
$testStatuses->{$testName} = $abortedStr;
$dbinfo{'duration'} = $duration;
}
-
eval {
$dbinfo{'status'} = $testStatuses->{$testName};
if($dbh) {
@@ -487,27 +746,26 @@ sub run
}
$self->cleanup($testStatuses->{$testName}, \%testHash, $testResult,
- $benchmarkResult, $log);
- #$report->testcase( $group->{'name'}, $testName, $duration, $msg, $testStatuses->{$testName}, $testResult ) if ( $report );
- $report->testcase( $group->{'name'}, $testName, $duration, $msg, $testStatuses->{$testName} ) if ( $report );
- $groupDuration = $groupDuration + $duration;
- $totalDuration = $totalDuration + $duration;
- printResults( $testStatuses, $log, "Results so far" );
- }
-
- if ( $report ) {
- $report->systemOut( $logname, $group->{'name'});
- printGroupResultsXml( $report, $group->{'name'}, $testStatuses, $groupDuration );
- }
- $report = 0;
- $groupDuration=0;
-
+ $benchmarkResult, $subLog);
+ #$report->testcase( $group->{'name'}, $testName, $duration, $msg, $testStatuses->{$testName}, $testResult ) if ( $report );
+ $report->testcase( $group->{'name'}, $testName, $duration, $msg, $testStatuses->{$testName} ) if ( $report );
+ $groupDuration = $groupDuration + $duration;
+ if ($productForkFactor > 1) {
+ printResults( $testStatuses, $subLog, "Results so far", basename($confFile), $groupName );
+ } else {
+ printResults( $testStatuses, $subLog, "Results so far" );
+ }
+ } # for each test
+
+ if ( $report ) {
+ $report->systemOut( $subLogName, $group->{'name'});
+ printGroupResultsXml( $report, $group->{'name'}, $testStatuses, $groupDuration );
+ }
+ $report = 0;
+ return $sawstart;
+}
- }
- # Do the global cleanup
- $self->globalCleanup(\%globalHash, $log);
-}
# TODO These should be removed
Modified: pig/trunk/test/e2e/harness/test_harness.pl
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/harness/test_harness.pl?rev=1404440&r1=1404439&r2=1404440&view=diff
==============================================================================
--- pig/trunk/test/e2e/harness/test_harness.pl (original)
+++ pig/trunk/test/e2e/harness/test_harness.pl Thu Nov 1 01:03:20 2012
@@ -62,11 +62,11 @@
use strict;
use File::Path;
+use File::Basename;
use Getopt::Long;
use Cwd;
-
# Var: $ROOT
# The root directory for the harness.
#
@@ -289,6 +289,7 @@ if ( -e "$harnessCfg" ) {
$globalCfg = readCfg("$harnessCfg");
$globalCfg->{'harnessCfg'} = $harnessCfg;
+ TestDriver::dbg("Hadoop mapred local dir defined to be [" . $globalCfg->{'hadoop.mapred.local.dir'} . "]\n");
} else {
die "FATAL ERROR: $0 at ".__LINE__." - Configuration file <$harnessCfg> does NOT exist\n";
}
@@ -386,7 +387,7 @@ if ($deploy) {
# Copy the global config into our cfg
foreach(keys(%$globalCfg)) {
next if $_ eq 'file';
- $cfg->{$_} = $globalCfg->{$_}; #foreach(keys(%$globalCfg));
+ $cfg->{$_} = $globalCfg->{$_};
}
# Instantiate the TestDeployer
@@ -465,28 +466,124 @@ if($dblog) {
print $log "Testrun id $globalCfg->{'trid'}\n";
}
-
my %testStatuses;
+
+my $forkFactor = int($ENV{'FORK_FACTOR_FILE'});
+
+# NB: check if the group fork factor >1 and $startat is defined: such combination is not supported
+# because in such case several groups are started semultaneously (in parallel):
+my $groupForkFactor = int($ENV{'FORK_FACTOR_GROUP'});
+if (($groupForkFactor > 1) && (defined $startat)) {
+ die "ERROR: '--startat' (or '-st') option is not supported when the group fork (parallel) factor > 1 (env. variable FORK_FACTOR_GROUP).\n";
+}
+
+if ($forkFactor > 1 || $groupForkFactor > 1) {
+ print "Configuration file fork factor: $forkFactor\n";
+ print "Group fork factor: $groupForkFactor\n";
+}
+
+my $pm;
+if ($forkFactor > 1) {
+ print $log "Configuration file fork factor: $forkFactor\n";
+ $pm = new Parallel::ForkManager($forkFactor);
+ # this is a method that will run in the main process on each job subprocess completion:
+ $pm -> run_on_finish (
+ sub {
+ my ($pid, $exit_code, $identification, $exit_signal, $core_dump, $data_structure_reference) = @_;
+ if (defined($data_structure_reference)) {
+ TestDriver::dbg("Subprocess [$identification] finished, pid=$pid, sent back: $data_structure_reference.\n");
+ TestDriver::putAll(\%testStatuses, $data_structure_reference);
+ } else {
+ print "ERROR: Subprocess [$identification] did not send back anything. Exit code = $exit_code\n";
+ }
+ my $subLogAgain = "$logfile-$identification";
+ TestDriver::appendFile($subLogAgain,$logfile);
+ }
+ );
+}
+
foreach my $arg (@ARGV) {
- print $log "INFO: $0 at ".__LINE__." : Loading configuration file $arg\n";
- my $cfg = readCfg($arg);
- # Copy contents of global config file into hash.
- foreach(keys(%$globalCfg)) {
- next if $_ eq 'file';
- $cfg->{$_} = $globalCfg->{$_}; # foreach(keys(%$globalCfg));
- print $log "\nINFO $0: $_=".$cfg->{$_};
- }
- print $log "\n";
+ my $cfg = readCfg($arg);
- my $driver = TestDriverFactory::getTestDriver($cfg);
- die "FATAL: $0: Driver does not exist\n" if ( !$driver );
- $driver->run(\@testgroups, \@testMatches, $cfg, $log, $dbh, \%testStatuses, $arg, $startat, $logfile);
+ my $subLog;
+ my $subLogName;
+ # basename of the .conf file (like "cmdline.conf")
+ # which is unique identifier for this loop body:
+ my $jobId = basename($arg);
+ $cfg->{'job-id'} = $jobId;
+ if ($forkFactor > 1) {
+ #$jobId = basename($arg); # basename of the .conf file (like "cmdline.conf")
+ $subLogName = "$logfile-$jobId";
+ open $subLog, ">$subLogName" or die "FATAL ERROR $0 at ".__LINE__." : Can't open $subLogName, $!\n";
+ # PARALLEL SECTION START: ===============================================================================
+ $pm->start($jobId) and next;
+ TestDriver::dbg("Started configuration file job \"$jobId\"\n");
+ } else {
+ $subLog = $log;
+ $subLogName = $logfile;
+ }
+
+ print $subLog "INFO: $0 at ".__LINE__." : Loading configuration file $arg\n";
+ # Copy contents of global config file into hash.
+ foreach(keys(%$globalCfg)) {
+ next if $_ eq 'file';
+ $cfg->{$_} = $globalCfg->{$_}; # foreach(keys(%$globalCfg));
+ print $subLog "\nINFO $0: $_=".$cfg->{$_};
+ }
+ print $subLog "\n";
+
+ my $driver = TestDriverFactory::getTestDriver($cfg);
+ die "FATAL: $0: Driver does not exist\n" if ( !$driver );
+
+ # eval this in a separate block to catch possible error and exit status:
+ eval
+ {
+ $driver->run(\@testgroups, \@testMatches, $cfg, $subLog, $dbh, \%testStatuses, $arg, $startat, $subLogName);
+ };
+ my $runStatus = $@;
+ my $runExitCode = $?; # exit code of the code block above.
+ if ($runStatus) {
+ print "ERROR: driver->run() returned the following error message [$runStatus].";
+ }
+
+ if ($forkFactor > 1) {
+ TestDriver::dbg("finishing config job [$jobId].\n");
+ $subLog -> close();
+ # NB: use run() exit code as the subprocess exit code:
+ # NB: send the "testStatuses" hash object reference (which is local to this subprocess) to the parent process:
+ $pm -> finish($runExitCode, \%testStatuses);
+ # PARALLEL SECTION END. ===============================================================================
+ }
}
+
+if ($forkFactor > 1) {
+ TestDriver::dbg("Waiting for the subprocesses...\n");
+ $pm->wait_all_children;
+ TestDriver::dbg("All subprocesses finished.\n");
+ # NB: in case of parallel execution we must reopen the $log descriptor
+ # because we appended to that file in pm#run_on_finish() sub:
+ open $log, ">>$logfile";
+}
+
$dbh->endTestRun($globalCfg->{'trid'}) if ($dblog);
-# don't remove the space after Final results, it matters.
-TestDriver::printResults(\%testStatuses, $log, "Final results ");
-print $log "Finished test run at " . time . "\n";
+# cleanup temporary Hadoop directories
+if( ($groupForkFactor>1 || $forkFactor>1)
+ && defined($globalCfg->{'hadoop.mapred.local.dir'})
+ && $globalCfg->{'exectype'} eq "local") {
+ TestDriver::dbg("Deleting temporary hadoop directories for local exec mode: [" . $globalCfg->{'hadoop.mapred.local.dir'} . "].\n");
+ rmtree( $globalCfg->{'hadoop.mapred.local.dir'} );
+}
+
+# don't remove the space after "Final results", it matters.
+if ($forkFactor > 1) {
+ TestDriver::printResults(\%testStatuses, $log, "Final results ", "", "");
+} else {
+ TestDriver::printResults(\%testStatuses, $log, "Final results ");
+}
+my $finishStr = "Finished test run at " . time . "\n";
+print $log $finishStr;
+TestDriver::dbg($finishStr);
# If they have requested undeployment, do it now
if ($undeploy) {
Modified: pig/trunk/test/e2e/pig/build.xml
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/build.xml?rev=1404440&r1=1404439&r2=1404440&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/build.xml (original)
+++ pig/trunk/test/e2e/pig/build.xml Thu Nov 1 01:03:20 2012
@@ -245,6 +245,12 @@
<target name="test-base" depends="property-check, udfs, tar, init-test">
<!-- If they have not specified tests to run then null it out -->
<property name="tests.to.run" value=""/>
+ <!-- fork (parallelization) factors for e2e tests execution.
+ Defaults are 1, which means *no* parellelization: -->
+ <property name="fork.factor.group" value="1"/>
+ <property name="fork.factor.conf.file" value="1"/>
+ <property name="hadoop.mapred.local.dir" value="/tmp/hadoop/mapred/local"/>
+ <property name="e2e.debug" value="false"/>
<exec executable="./test_harness.pl" dir="${test.location}" failonerror="true">
<env key="HARNESS_ROOT" value="."/>
@@ -272,6 +278,11 @@
<env key="PH_BENCHMARK_CACHE_PATH" value="${PH_BENCHMARK_CACHE_PATH}"/>
<env key="HCAT_BIN" value="${hcat.bin}"/>
<env key="PERL5LIB" value="${harness.PERL5LIB}"/>
+ <env key="FORK_FACTOR_GROUP" value="${fork.factor.group}"/>
+ <env key="FORK_FACTOR_FILE" value="${fork.factor.conf.file}"/>
+ <env key="HADOOP_MAPRED_LOCAL_DIR" value="${hadoop.mapred.local.dir}"/>
+ <env key="E2E_DEBUG" value="${e2e.debug}"/>
+
<arg line="${tests.to.run}"/>
<arg value="${test.location}/tests/cmdline.conf"/>
<arg value="${test.location}/tests/multiquery.conf"/>
Modified: pig/trunk/test/e2e/pig/conf/local.conf
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/conf/local.conf?rev=1404440&r1=1404439&r2=1404440&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/conf/local.conf (original)
+++ pig/trunk/test/e2e/pig/conf/local.conf Thu Nov 1 01:03:20 2012
@@ -58,6 +58,8 @@ $cfg = {
, 'userhomePath' => "$ENV{HOME}"
,'local.bin' => '/usr/bin'
+ # NB: we provide the default there as a fallback in case if there is no corresponding ENV key:
+ ,'hadoop.mapred.local.dir' => defined $ENV{'HADOOP_MAPRED_LOCAL_DIR'} ? $ENV{'HADOOP_MAPRED_LOCAL_DIR'} : "/tmp/hadoop/mapred/local"
,'logDir' => "$ENV{PH_OUT}/log"
,'propertiesFile' => "./conf/testpropertiesfile.conf"
Modified: pig/trunk/test/e2e/pig/deployers/ExistingClusterDeployer.pm
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/deployers/ExistingClusterDeployer.pm?rev=1404440&r1=1404439&r2=1404440&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/deployers/ExistingClusterDeployer.pm (original)
+++ pig/trunk/test/e2e/pig/deployers/ExistingClusterDeployer.pm Thu Nov 1 01:03:20 2012
@@ -68,15 +68,15 @@ sub checkPrerequisites
# They must have declared the conf directory for their Hadoop installation
if (! defined $cfg->{'hadoopconfdir'} || $cfg->{'hadoopconfdir'} eq "") {
print $log "You must set the key 'hadoopconfdir' to your Hadoop conf directory "
- . "in existing.conf\n";
- die "hadoopconfdir is not set in existing.conf\n";
+ . "in existing_deployer.conf\n";
+ die "hadoopconfdir is not set in existing_deployer.conf\n";
}
# They must have declared the executable path for their Hadoop installation
if (! defined $cfg->{'hadoopbin'} || $cfg->{'hadoopbin'} eq "") {
print $log "You must set the key 'hadoopbin' to your Hadoop bin path"
- . "in existing.conf\n";
- die "hadoopbin is not set in existing.conf\n";
+ . "in existing_deployer.conf\n";
+ die "hadoopbin is not set in existing_deployer.conf\n";
}
# Run a quick and easy Hadoop command to make sure we can
@@ -340,7 +340,7 @@ sub undeploy
#
sub confirmUndeployment
{
- die "$0 INFO : confirmUndeployment is a virtual function!";
+ # TODO: implement a correct confirmation, but let's not die there.
}
# TODO
Modified: pig/trunk/test/e2e/pig/drivers/TestDriverPig.pm
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/drivers/TestDriverPig.pm?rev=1404440&r1=1404439&r2=1404440&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/drivers/TestDriverPig.pm (original)
+++ pig/trunk/test/e2e/pig/drivers/TestDriverPig.pm Thu Nov 1 01:03:20 2012
@@ -104,13 +104,13 @@ sub replaceParameters
sub globalSetup
{
my ($self, $globalHash, $log) = @_;
- my $subName = (caller(0))[3];
-
# Setup the output path
my $me = `whoami`;
chomp $me;
- $globalHash->{'runid'} = $me . "." . time;
+ my $jobId = $globalHash->{'job-id'};
+ my $timeId = time;
+ $globalHash->{'runid'} = $me . "-" . $timeId . "-" . $jobId;
# if "-ignore false" was provided on the command line,
# it means do run tests even when marked as 'ignore'
@@ -121,51 +121,61 @@ sub globalSetup
$globalHash->{'outpath'} = $globalHash->{'outpathbase'} . "/" . $globalHash->{'runid'} . "/";
$globalHash->{'localpath'} = $globalHash->{'localpathbase'} . "/" . $globalHash->{'runid'} . "/";
+ $globalHash->{'tmpPath'} = $globalHash->{'tmpPath'} . "/" . $globalHash->{'runid'} . "/";
+}
+
+sub globalSetupConditional() {
+ my ($self, $globalHash, $log) = @_;
# add libexec location to the path
if (defined($ENV{'PATH'})) {
$ENV{'PATH'} = $globalHash->{'scriptPath'} . ":" . $ENV{'PATH'};
- }
- else {
+ } else {
$ENV{'PATH'} = $globalHash->{'scriptPath'};
}
my @cmd = ($self->getPigCmd($globalHash, $log), '-e', 'mkdir', $globalHash->{'outpath'});
-
-
print $log "Going to run " . join(" ", @cmd) . "\n";
- IPC::Run::run(\@cmd, \undef, $log, $log) or die "Cannot create HDFS directory " . $globalHash->{'outpath'} . ": $? - $!\n";
+ IPC::Run::run(\@cmd, \undef, $log, $log) or die "$0 at ".__LINE__.": Cannot create HDFS directory " . $globalHash->{'outpath'} . ": $? - $!\n";
IPC::Run::run(['mkdir', '-p', $globalHash->{'localpath'}], \undef, $log, $log) or
- die "Cannot create localpath directory " . $globalHash->{'localpath'} .
- " " . "$ERRNO\n";
+ die "$0 at ".__LINE__.": Cannot create localpath directory [" . $globalHash->{'localpath'} .
+ "]: " . "$ERRNO\n";
# Create the temporary directory
IPC::Run::run(['mkdir', '-p', $globalHash->{'tmpPath'}], \undef, $log, $log) or
- die "Cannot create temporary directory " . $globalHash->{'tmpPath'} .
- " " . "$ERRNO\n";
+ die "$0 at ".__LINE__.": Cannot create localpath directory [" . $globalHash->{'tmpPath'} .
+ "]: " . "$ERRNO\n";
# Create the HDFS temporary directory
@cmd = ($self->getPigCmd($globalHash, $log), '-e', 'mkdir', "tmp/$globalHash->{'runid'}");
print $log "Going to run " . join(" ", @cmd) . "\n";
- IPC::Run::run(\@cmd, \undef, $log, $log) or die "Cannot create HDFS directory " . $globalHash->{'outpath'} . ": $? - $!\n";
+ IPC::Run::run(\@cmd, \undef, $log, $log) or die "$0 at ".__LINE__.": Cannot create HDFS directory " . "tmp/$globalHash->{'runid'}" . ": $? - $!\n";
}
-sub globalCleanup
+sub globalCleanup()
{
+ # noop there because the removal of temp directories, which are created in #globalSetupConditional(), is to be
+ # performed in method #globalCleanupConditional().
+}
+
+sub globalCleanupConditional() {
my ($self, $globalHash, $log) = @_;
+ # NB: both local and HDFS output directories are not removed there, because these data may
+ # be needed to investigate the tests failures.
+
IPC::Run::run(['rm', '-rf', $globalHash->{'tmpPath'}], \undef, $log, $log) or
- warn "Cannot remove temporary directory " . $globalHash->{'tmpPath'} .
- " " . "$ERRNO\n";
+ warn "Cannot remove temporary directory " . $globalHash->{'tmpPath'} .
+ " " . "$ERRNO\n";
# Cleanup the HDFS temporary directory
my @cmd = ($self->getPigCmd($globalHash, $log), '-e', 'fs', '-rmr', "tmp/$globalHash->{'runid'}");
- print $log "Going to run " . join(" ", @cmd) . "\n";
- IPC::Run::run(\@cmd, \undef, $log, $log) or die "Cannot create HDFS directory " . $globalHash->{'outpath'} . ": $? - $!\n";
+ print $log "Going to run: [" . join(" ", @cmd) . "]\n";
+ IPC::Run::run(\@cmd, \undef, $log, $log)
+ or die "$0 at ".__LINE__.": Cannot remove HDFS directory " . "tmp/$globalHash->{'runid'}" . ": $? - $!\n";
}
-
sub runTest
{
my ($self, $testCmd, $log) = @_;
@@ -337,6 +347,17 @@ sub runScript
return \%result;
}
+sub hadoopLocalTmpDir($$)
+{
+ my ($self, $testCmd) = @_;
+
+ if (defined($testCmd->{'hadoop.mapred.local.dir'})
+ && (int($ENV{'FORK_FACTOR_GROUP'})>1 || int($ENV{'FORK_FACTOR_FILE'})>1)) {
+ return $testCmd->{'hadoop.mapred.local.dir'} . "/" . $PID;
+ } else {
+ return undef;
+ }
+}
sub getPigCmd($$$)
{
@@ -359,13 +380,25 @@ sub getPigCmd($$$)
push(@pigCmd, '-Dpig.additional.jars='.$testCmd->{'additionaljars'});
}
+ my $additionalJavaParams = undef;
if ($testCmd->{'exectype'} eq "local") {
- push(@{$testCmd->{'java_params'}}, "-Xmx1024m");
+ $additionalJavaParams = "-Xmx1024m";
+ my $hadoopTmpDir = $self->hadoopLocalTmpDir($testCmd);
+ if (defined($hadoopTmpDir)) {
+ $additionalJavaParams .= " -Dmapred.local.dir=$hadoopTmpDir -Dmapreduce.cluster.local.dir=$hadoopTmpDir";
+ }
+ TestDriver::dbg("Additional java parameters: [$additionalJavaParams].\n");
+
push(@pigCmd, ("-x", "local"));
}
- if (defined($testCmd->{'java_params'})) {
- $ENV{'PIG_OPTS'} = join(" ", @{$testCmd->{'java_params'}});
+ if (defined($testCmd->{'java_params'}) || defined($additionalJavaParams)) {
+ if (defined($testCmd->{'java_params'})) {
+ $ENV{'PIG_OPTS'} = join(" ", @{$testCmd->{'java_params'}}, $additionalJavaParams);
+ } else {
+ $ENV{'PIG_OPTS'} = $additionalJavaParams;
+ }
+ TestDriver::dbg("PIG_OPTS set to be: [$ENV{'PIG_OPTS'}].\n");
} else {
$ENV{'PIG_OPTS'} = undef;
}