You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2012/11/01 02:00:44 UTC

svn commit: r1404439 - in /pig/branches/branch-0.11: ./ test/e2e/harness/ test/e2e/pig/ test/e2e/pig/conf/ test/e2e/pig/deployers/ test/e2e/pig/drivers/

Author: rohini
Date: Thu Nov  1 01:00:43 2012
New Revision: 1404439

URL: http://svn.apache.org/viewvc?rev=1404439&view=rev
Log:
PIG-2898: Parallel execution of e2e tests (iveselovsky via rohini)

Modified:
    pig/branches/branch-0.11/CHANGES.txt
    pig/branches/branch-0.11/test/e2e/harness/TestDriver.pm
    pig/branches/branch-0.11/test/e2e/harness/test_harness.pl
    pig/branches/branch-0.11/test/e2e/pig/build.xml
    pig/branches/branch-0.11/test/e2e/pig/conf/local.conf
    pig/branches/branch-0.11/test/e2e/pig/deployers/ExistingClusterDeployer.pm
    pig/branches/branch-0.11/test/e2e/pig/drivers/TestDriverPig.pm

Modified: pig/branches/branch-0.11/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/CHANGES.txt?rev=1404439&r1=1404438&r2=1404439&view=diff
==============================================================================
--- pig/branches/branch-0.11/CHANGES.txt (original)
+++ pig/branches/branch-0.11/CHANGES.txt Thu Nov  1 01:00:43 2012
@@ -28,6 +28,8 @@ PIG-1891 Enable StoreFunc to make intell
 
 IMPROVEMENTS
 
+PIG-2898: Parallel execution of e2e tests (iveselovsky via rohini)
+
 PIG-2913: org.apache.pig.test.TestPigServerWithMacros fails sometimes because it picks up previous minicluster configuration file (cheolsoo via julien)
 
 PIG-2976: Reduce HBaseStorage logging (billgraham)

Modified: pig/branches/branch-0.11/test/e2e/harness/TestDriver.pm
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/test/e2e/harness/TestDriver.pm?rev=1404439&r1=1404438&r2=1404439&view=diff
==============================================================================
--- pig/branches/branch-0.11/test/e2e/harness/TestDriver.pm (original)
+++ pig/branches/branch-0.11/test/e2e/harness/TestDriver.pm Thu Nov  1 01:00:43 2012
@@ -24,6 +24,10 @@ package TestDriver;
 use TestDriverFactory;
 use TestReport;
 use File::Path;
+use Parallel::ForkManager;
+use FileHandle;
+use File::Copy;
+use File::Basename;
 
 my $passedStr = 'passed';
 my $failedStr = 'failed';
@@ -31,6 +35,27 @@ my $abortedStr = 'aborted';
 my $skippedStr = 'skipped';
 my $dependStr = 'failed_dependency';
 
+# A constant to be used as a key in hash:
+my $keyGlobalSetupConditionalDone = 'globalSetupConditionaldone';
+
+################################################################################
+# Sub: appendToLength
+# static mathod to make a string not shorter than N characters in length: 
+# appends spaces unlit the desired length achieved.
+#
+# Paramaters:
+# str - the string
+# len - the min deired length 
+#
+# Returns: the modified string
+#
+sub appendToLength($$) {
+   my ($str, $len) = @_; 
+   while (length($str) < $len) {
+      $str .= " ";
+   }   
+   return $str;
+}
 
 ##############################################################################
 #  Sub: printResults
@@ -47,7 +72,7 @@ my $dependStr = 'failed_dependency';
 #
 sub printResults
 {
-	my ($testStatuses, $log, $prefix) = @_;
+	my ($testStatuses, $log, $prefix, $confFile, $groupName) = @_;
 
 	my ($pass, $fail, $abort, $depend, $skipped) = (0, 0, 0, 0, 0);
 
@@ -59,11 +84,86 @@ sub printResults
 		($testStatuses->{$_} eq $skippedStr) && $skipped++;
 	}
 
-	my $msg = "$prefix, PASSED: $pass FAILED: $fail SKIPPED: $skipped ABORTED: $abort "
-		. "FAILED DEPENDENCY: $depend";
+        my $context = "";
+        if (defined($confFile) && (length(confFile) > 0)) {
+           $context = " [$confFile-$groupName]"; 
+        } 
+        # XXX why comma is added there?
+	my $msg = appendToLength($prefix . ",", 18) 
+            . " PASSED: " . appendToLength($pass,4) 
+            . " FAILED: " . appendToLength($fail,4) 
+            . " SKIPPED: " . appendToLength($skipped,4) 
+            . " ABORTED: " . appendToLength($abort,4) 
+            . " FAILED DEPENDENCY: " . appendToLength($depend,4)
+            . $context;
 	print $log "$msg\n";
- 	print "$msg\n";
-         
+ 	print "$msg\n";      
+}
+
+##############################################################################
+# Puts all the k-v pairs from sourceHash to the targetHash.
+# Returns: void
+# parameters: 
+#   1: targetHash, 
+#   2: sourceHash.
+sub putAll($$)
+{
+    my ($targetHash, $sourceHash) = @_;
+    while (my ($key, $value) = each(%$sourceHash)) {
+        $targetHash->{ $key } = $value;
+    }
+}
+
+##############################################################################
+# appends one file to another.
+# parameters: 
+#   1: sourceFileName, 
+#   2: targetFileName.
+# Returns: void
+sub appendFile($$) {
+    my ($source, $target) = @_;
+    dbg("Appending file [" . Cwd::realpath($source) . "] >> [" . Cwd::realpath($target) . "]\n");
+    $sourceHandle = FileHandle->new("<$source");
+    if (! defined $sourceHandle) {
+        die "Cannot open source file [$source].";
+    }
+    $targetHandle = FileHandle->new(">>$target");
+    if (defined $targetHandle) {
+        copy($sourceHandle, $targetHandle);
+        $targetHandle->close(); 
+        $sourceHandle->close(); 
+    } else {
+        die "cannot open target file [$target].";
+    }
+}
+
+##############################################################################
+# Diagnostic sub to print a hash contents
+# Paramaters: 
+#   1: the hash reference;
+# Returns: void
+sub dbgDumpHash($;$)
+{
+    if ($ENV{'E2E_DEBUG'} eq 'true') {
+       my ($myhash, $msg) = @_;
+       print "Dump of hash $msg:\n";
+       while (my ($key, $value) = each(%$myhash)) {
+           print "  [$key] = [$value]\n";
+       }
+    }
+}
+
+##############################################################################
+# Diagnostic sub to print a debug output.
+# This is useful when debugging the harness perl scripts.
+# Paramaters: 
+#   1*: object(s) to be printed, typically one string;
+# Returns: void
+sub dbg(@) 
+{
+    if ($ENV{'E2E_DEBUG'} eq 'true') {
+       print @_;
+    }
 }
 
 ##############################################################################
@@ -97,7 +197,6 @@ sub printGroupResultsXml
 
         my $total= $pass + $fail + $abort;
         $report->totals( $groupName, $total, $fail, $abort, $totalDuration );
-
 }
 
 ##############################################################################
@@ -124,10 +223,13 @@ sub new
 
 ##############################################################################
 #  Sub: globalSetup
-# Set up before any tests are run.  This gives each individual driver a chance to do
+# Set up before any tests from the group are run.  This gives each individual driver a chance to do
 # setup.  This function will only be called once, before all the tests are
 # run.  A driver need not implement it.  It is a virtual function.
 #
+# This method invoked unconditionally (always), even if there are no test to run in the group. See
+# also #globalSetupConditional() description. 
+#
 # Paramaters:
 # globalHash - Top level hash from config file (does not have any group
 # or test information in it).
@@ -140,11 +242,32 @@ sub globalSetup
 {
 }
 
+##############################################################################
+#  Sub: globalSetupConditional 
+# Set up before any tests from the test config file (in sequential mode) or test group (in parallel mode) are run. 
+# Executes after #globalSetup(). Executes *only* if there is at least one test to run. Introduced for performance
+# optimization in parallel execution mode.
+# It is a virtual function. Subclasses may override it.
+#
+# Paramaters:
+# globalHash - Top level hash from config file (does not have any group
+# or test information in it).
+# log - log file handle
+#
+# Returns:
+# None
+#
+sub globalSetupConditional
+{
+}
+
 ###############################################################################
 # Sub: globalCleanup
 # Clean up after all tests have run.  This gives each individual driver a chance to do
 # cleanup.  This function will only be called once, after all the tests are
 # run.  A driver need not implement it.  It is a virtual function.
+# This method invoked unconditionally, even if no test in the group was executed. 
+# See #globalCleanupConditional() method description.
 #
 # Paramaters:
 # globalHash - Top level hash from config file (does not have any group
@@ -158,6 +281,22 @@ sub globalCleanup
 }
 
 ###############################################################################
+# Sub: globalCleanupConditional
+# Clean up after all tests have run, before #globalCleanup(). Invoked iff #globalSetupConditional()
+# was previously invoked gor this config file (sequential mode) or test group (parallel mode).
+# It is a virtual function. Subclasses may override it.
+#
+# Paramaters:
+# globalHash - Top level hash from config file (does not have any group
+# or test information in it).
+# log - log file handle
+#
+# Returns:
+# None
+sub globalCleanupConditional() {
+}
+
+###############################################################################
 # Sub: runTest
 # Run a test.  This is a pure virtual function.
 #
@@ -280,12 +419,8 @@ sub run
 	my ($self, $testsToRun, $testsToMatch, $cfg, $log, $dbh, $testStatuses,
 		$confFile, $startat, $logname ) = @_;
 
-        my $subName      = (caller(0))[3];
-        my $msg="";
-        my $duration=0;
-        my $totalDuration=0;
-        my $groupDuration=0;
-	my $sawstart = !(defined $startat);
+    my $subName = (caller(0))[3];
+    my $msg="";
 	# Rather than make each driver handle our multi-level cfg, we'll flatten
 	# the hashes into one for it.
 	my %globalHash;
@@ -298,20 +433,129 @@ sub run
 		$globalHash{$_} = $cfg->{$_};
 	}
 
-	$globalHash{$_} = $cfg->{$_};
-	# Do the global setup
-	$self->globalSetup(\%globalHash, $log);
+    my $report=0;
+    my $properties= new Properties(0, $globalHash{'propertiesFile'});
 
-        my $report=0;
-        my $properties= new Properties(0, $globalHash{'propertiesFile'});
+    my $fileForkFactor = int($ENV{'FORK_FACTOR_FILE'});
+    my $groupForkFactor = int($ENV{'FORK_FACTOR_GROUP'});
+    # NB: this is to distinguish the sequential mode from parallel one: 
+    my $productForkFactor = $fileForkFactor * $groupForkFactor;
+    my $pm;
+    if ($groupForkFactor > 1) {
+        print $log "Group fork factor: $groupForkFactor\n";
+        # Create the fork manager:
+        $pm = new Parallel::ForkManager($groupForkFactor);
+        # this is a callback method that will run in the main process on each job subprocess completion:
+        $pm -> run_on_finish (
+           sub {
+              my ($pid, $exit_code, $identification, $exit_signal, $core_dump, $data_structure_reference) = @_; 
+              # see what the child sent us, if anything
+              if (defined($data_structure_reference)) { 
+                  dbg("Group subprocess [$identification] finished, pid=${pid}, sent back: [$data_structure_reference].\n");
+                  dbgDumpHash($data_structure_reference, "The hash passed in in the run_on_finish callback:");
+                  putAll($testStatuses, $data_structure_reference);
+                  dbgDumpHash($testStatuses, "The statuses after merge in the run_on_finish callback:");
+              } else {
+                  print "ERROR: Group subprocess [$identification] did not send back anything. Exit code = $exit_code\n";
+              }
+              my $subLogAgain = "$logname-$identification";
+              appendFile($subLogAgain,$logname);
+           }
+       );
+    } else {
+    	# Do the global setup:
+    	$self->globalSetup(\%globalHash, $log);
+    }
 
-        my %groupExecuted;
+    my $localStartAt = $startat;
 	foreach my $group (@{$cfg->{'groups'}}) {
- 
-                print $log "INFO $subName at ".__LINE__.": Running TEST GROUP(".$group->{'name'}.")\n";
+        my $groupName = $group->{'name'};
+
+        my $subLog;
+        my $subLogName;
+        if ($groupForkFactor > 1) {
+            # use group name as the Job id:
+            my $jobId = $groupName;
+
+            $subLogName = "$logname-$jobId";
+            open $subLog, ">$subLogName" or die "FATAL ERROR $0 at ".__LINE__." : Can't open $subLogName, $!\n";
+
+            dbg("**** Logging to [$subLogName].\n");
+            # PARALLEL SECTION START: ===============================================================================
+            $pm->start($jobId) and next;
+            dbg("Started test group job \"$jobId\"\n");
+
+            dbg("Doing setup for test group [$groupName]...\n");
+            # Set the group-specific ID:
+            # NB: note that '$globalHash' here is an object cloned for this subprocess.
+            # So, there is no concurrency issue in using '$globalHash' there:
+            $globalHash{'job-id'} = $globalHash{'job-id'} . "-" . $jobId;
+            # Do the global setup which is specific for *this group*:
+        	$self->globalSetup(\%globalHash, $subLog);
+        } else {
+            $subLog = $log;
+            $subLogName = $logname;
+        }
+
+        # Run the group of tests.
+        # NB: the processing of $localStartAt parameter happens only if the groupForkFactor < 1.
+        my $sawStart = $self -> runTestGroup($groupName, $subLog, $confFile, \%globalHash, $group, $runAll, $testsToRun, $testsToMatch, $localStartAt, $testStatuses, $productForkFactor);
+        if ((defined $localStartAt) && $sawStart) {
+            undef $localStartAt;
+        }
+
+        if ($groupForkFactor > 1) {
+            # do the clanups that are specific for *this group*.
+            dbg("Doing cleanup for test group [$groupName]...\n");
+            # NB: invoke it in such way to emphasize the fact that this method is not virtual:
+            globalCleanupConditionalIf($self, \%globalHash, $subLog);
+            $self->globalCleanup(\%globalHash, $subLog);
+
+            dbg("Finishing test group [$groupName].\n");
+            dbgDumpHash($testStatuses, "The satatuses hash at the fork section end");
+            # NB: send the "testStatuses" hash object reference (which is local to this subprocess) to the parent process: 
+            $subLog -> close();
+
+            # TODO: may also consider the #runTestGroup block exit status and use it there.
+            $pm -> finish(0, $testStatuses);
+            # PARALLEL SECTION END. ===============================================================================
+        }
+	} # foreach $group  
+
+    if ($groupForkFactor > 1) {
+        $pm->wait_all_children;
+    } else {
+    	# Do the global cleanups:
+        # NB: invoke it in such way to emphasize the fact that this method is not virtual:
+        globalCleanupConditionalIf($self, \%globalHash, $log);
+    	$self->globalCleanup(\%globalHash, $log);
+    }
+}
+
+# Servce method to conditionally perform the virtual #globalCleanupConditional().
+# NB: This sub should be "final" in Java terms,
+# subclasses should not override it.
+sub globalCleanupConditionalIf() {
+    my ($self, $globalHash, $log) = @_;
+    if (defined($globalHash->{$keyGlobalSetupConditionalDone})) {
+        $self -> globalCleanupConditional($globalHash, $log);
+    }
+}
+
+################################################################################
+# Separated sub to run a test group.
+# Parameters: (same named values from #run(...) sub with the same meaning).
+# Returns: 'true' if the test defined by '$startat' was found, and 'false' otherwise.
+#   (If the '$startat' is null, always returns true.)   
+sub runTestGroup() {
+        my ($self, $groupName, $subLog, $confFile, $globalHash, $group, $runAll, $testsToRun, $testsToMatch, $startat, $testStatuses, $productForkFactor) = @_;
+
+        my $subName = (caller(0))[3];
+        print $subLog "INFO $subName at ".__LINE__.": Running TEST GROUP(".$groupName.")\n";
+        my $sawstart = !(defined $startat);
                 
-		my %groupHash = %globalHash;
-		$groupHash{'group'} = $group->{'name'};
+		my %groupHash = %$globalHash; 
+		$groupHash{'group'} = $groupName;
 
 		# Read the group keys
 		foreach (keys(%$group)) {
@@ -319,8 +563,10 @@ sub run
 			$groupHash{$_} = $group->{$_};
 		}
 
+        my $groupDuration=0;
+        my $duration=0;
 
-		# Run each test
+		# Run each test in the group:
 		foreach my $test (@{$group->{'tests'}}) {
 			# Check if we're supposed to run this one or not.
 			if (!$runAll) {
@@ -361,17 +607,19 @@ sub run
 						}
 					}
 				}
-
 				next unless $foundIt;
 			}
 
 			# This is a test, so run it.
 			my %testHash = %groupHash;
+            my $tmpTestHash = \%testHash;
+
 			foreach (keys(%$test)) {
 				$testHash{$_} = $test->{$_};
 			}
 
 			my $testName = $testHash{'group'} . "_" . $testHash{'num'};
+            dbg("################### Executing test [$testName]...\n");
 
 #            if ( $groupExecuted{ $group->{'name'} }== 0 ){
 #                $groupExecuted{ $group->{'name'} }=1;
@@ -386,13 +634,13 @@ sub run
 
 			# Check that ignore isn't set for this file, group, or test
 			if (defined $testHash{'ignore'}) {
-				print $log "Ignoring test $testName, ignore message: " .
+				print $subLog "Ignoring test $testName, ignore message: " .
 					$testHash{'ignore'} . "\n";
 				next;
 			}
 
 			# Have we not reached the starting point yet?
-			if (!$sawstart) {
+			if (!$sawstart) { 
 				if ($testName eq $startat) {
 					$sawstart = 1;
 				} else {
@@ -407,7 +655,7 @@ sub run
 			foreach (keys(%testHash)) {
 				if (/^depends_on/ && defined($testStatuses->{$testHash{$_}}) &&
 						$testStatuses->{$testHash{$_}} ne $passedStr) {
-					print $log "Skipping test $testName, it depended on " .
+					print $subLog "Skipping test $testName, it depended on " .
 						"$testHash{$_} which returned a status of " .
 						"$testStatuses->{$testHash{$_}}\n";
 					$testStatuses->{$testName} = $dependStr;
@@ -416,14 +664,28 @@ sub run
 				}
 			}
 			if ($skipThisOne) {
-				printResults($testStatuses, $log, "Results so far");
-				next;
+                            if ($productForkFactor > 1) {
+				printResults($testStatuses, $subLog, "Results so far", basename($confFile), $groupName);
+                            } else {
+				printResults($testStatuses, $subLog, "Results so far");
+                            }
+			    next;
 			}
 
-			print $log "\n******************************************************\n";
-			print $log "\nTEST: $confFile::$testName\n";
-			print $log  "******************************************************\n";
-			print $log "Beginning test $testName at " . time . "\n";
+			print $subLog "\n******************************************************\n";
+			print $subLog "\nTEST: $confFile::$testName\n";
+			print $subLog  "******************************************************\n";
+			print $subLog "Beginning test $testName at " . time . "\n";
+            
+            # At this point we're going to run the test for sure. 
+            # So, do the preparation for that, if not yet done: 
+            if (!defined($globalHash->{$keyGlobalSetupConditionalDone})) {
+                $self -> globalSetupConditional($globalHash, $subLog);
+                # this preparation should be done only *once* per each $globalHash instance,
+                # so, set special flag to prevent #globalSetupConditional from being executed again:
+                $globalHash->{$keyGlobalSetupConditionalDone} = 'true';
+            }
+
 			my %dbinfo = (
 				'testrun_id' => $testHash{'trid'},
 				'test_type' => $testHash{'driver'},
@@ -436,11 +698,11 @@ sub run
 			my $endTime = 0;
 			my ($testResult, $benchmarkResult);
 			eval {
-				$testResult = $self->runTest(\%testHash, $log);
+				$testResult = $self->runTest(\%testHash, $subLog);
 				$endTime = time;
-				$benchmarkResult = $self->generateBenchmark(\%testHash, $log);
+				$benchmarkResult = $self->generateBenchmark(\%testHash, $subLog);
 				my $result =
-					$self->compare($testResult, $benchmarkResult, $log, \%testHash);
+					$self->compare($testResult, $benchmarkResult, $subLog, \%testHash);
 				$msg = "INFO: $subName() at ".__LINE__.":Test $testName";
 
                 if ($result eq $self->{'wrong_execution_mode'}) {
@@ -457,24 +719,21 @@ sub run
 				}
 				$msg= "$msg at " . time . "\n";
 				#print $msg;
-				print $log $msg;
+				print $subLog $msg;
 				$duration = $endTime - $beginTime;
 				$dbinfo{'duration'} = $duration;
 				$self->recordResults($result, $testResult
-                                          , $benchmarkResult, \%dbinfo, $log);
-                                  
+                                          , $benchmarkResult, \%dbinfo, $subLog);
 			};
 
-
 			if ($@) {
 				$msg= "ERROR $subName at : ".__LINE__." Failed to run test $testName <$@>\n";
 				#print $msg;
-				print $log $msg;
+				print $subLog $msg;
 				$testStatuses->{$testName} = $abortedStr;
 				$dbinfo{'duration'} = $duration;
 			}
 
-
 			eval {
 				$dbinfo{'status'} = $testStatuses->{$testName};
 				if($dbh) {
@@ -487,27 +746,26 @@ sub run
 			}
 
 			$self->cleanup($testStatuses->{$testName}, \%testHash, $testResult,
-				$benchmarkResult, $log);
-                        #$report->testcase( $group->{'name'}, $testName, $duration, $msg, $testStatuses->{$testName}, $testResult ) if ( $report );
-                        $report->testcase( $group->{'name'}, $testName, $duration, $msg, $testStatuses->{$testName} ) if ( $report );
-                        $groupDuration = $groupDuration + $duration;
-                        $totalDuration = $totalDuration + $duration;
-			printResults( $testStatuses, $log, "Results so far" );
-		}
-
-                if ( $report ) {
-	           $report->systemOut( $logname, $group->{'name'});
-	           printGroupResultsXml( $report, $group->{'name'}, $testStatuses, $groupDuration );
-                }
-                $report = 0;
-                $groupDuration=0;
-
+				$benchmarkResult, $subLog);
+            #$report->testcase( $group->{'name'}, $testName, $duration, $msg, $testStatuses->{$testName}, $testResult ) if ( $report );
+            $report->testcase( $group->{'name'}, $testName, $duration, $msg, $testStatuses->{$testName} ) if ( $report );
+            $groupDuration = $groupDuration + $duration;
+                        if ($productForkFactor > 1) {
+			   printResults( $testStatuses, $subLog, "Results so far", basename($confFile), $groupName );
+                        } else {
+			   printResults( $testStatuses, $subLog, "Results so far" );
+                        }
+		}  # for each test
+
+        if ( $report ) {
+            $report->systemOut( $subLogName, $group->{'name'});
+            printGroupResultsXml( $report, $group->{'name'}, $testStatuses, $groupDuration );
+        }
+        $report = 0;
+        return $sawstart;
+}
 
-	}
 
-	# Do the global cleanup
-	$self->globalCleanup(\%globalHash, $log);
-}
 
 # TODO These should be removed
 

Modified: pig/branches/branch-0.11/test/e2e/harness/test_harness.pl
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/test/e2e/harness/test_harness.pl?rev=1404439&r1=1404438&r2=1404439&view=diff
==============================================================================
--- pig/branches/branch-0.11/test/e2e/harness/test_harness.pl (original)
+++ pig/branches/branch-0.11/test/e2e/harness/test_harness.pl Thu Nov  1 01:00:43 2012
@@ -62,11 +62,11 @@
 
 use strict;
 use File::Path;
+use File::Basename;
 use Getopt::Long;
 use Cwd;
 
 
-
 #  Var: $ROOT
 #  The root directory for the harness.
 #
@@ -289,6 +289,7 @@ if ( -e "$harnessCfg" ) {
    $globalCfg = readCfg("$harnessCfg");
    $globalCfg->{'harnessCfg'} = $harnessCfg;
    
+   TestDriver::dbg("Hadoop mapred local dir defined to be [" . $globalCfg->{'hadoop.mapred.local.dir'} . "]\n");
 } else {
    die "FATAL ERROR: $0 at ".__LINE__." - Configuration file <$harnessCfg> does NOT exist\n";
 }
@@ -386,7 +387,7 @@ if ($deploy) {
 	# Copy the global config into our cfg
 	foreach(keys(%$globalCfg)) {
 		next if $_ eq 'file';
-		$cfg->{$_} = $globalCfg->{$_}; #foreach(keys(%$globalCfg));
+		$cfg->{$_} = $globalCfg->{$_};
 	}
 
     # Instantiate the TestDeployer
@@ -465,28 +466,124 @@ if($dblog) {
 	print $log "Testrun id  $globalCfg->{'trid'}\n";
 }
 
-
 my %testStatuses;
+
+my $forkFactor = int($ENV{'FORK_FACTOR_FILE'});
+
+# NB: check if the group fork factor >1 and $startat is defined: such combination is not supported 
+# because in such case several groups are started semultaneously (in parallel):
+my $groupForkFactor = int($ENV{'FORK_FACTOR_GROUP'});
+if (($groupForkFactor > 1) && (defined $startat)) {
+    die "ERROR: '--startat' (or '-st') option is not supported when the group fork (parallel) factor > 1 (env. variable FORK_FACTOR_GROUP).\n";
+}
+
+if ($forkFactor > 1 || $groupForkFactor > 1) {
+    print "Configuration file fork factor: $forkFactor\n";
+    print "Group fork factor:              $groupForkFactor\n";
+}
+
+my $pm;
+if ($forkFactor > 1) {
+    print $log "Configuration file fork factor: $forkFactor\n";
+    $pm = new Parallel::ForkManager($forkFactor);
+    # this is a method that will run in the main process on each job subprocess completion:
+    $pm -> run_on_finish (
+        sub {
+          my ($pid, $exit_code, $identification, $exit_signal, $core_dump, $data_structure_reference) = @_; 
+          if (defined($data_structure_reference)) { 
+            TestDriver::dbg("Subprocess [$identification] finished, pid=$pid, sent back: $data_structure_reference.\n");
+            TestDriver::putAll(\%testStatuses, $data_structure_reference);
+          } else {
+            print "ERROR: Subprocess [$identification] did not send back anything. Exit code = $exit_code\n";
+          }
+          my $subLogAgain = "$logfile-$identification";  
+          TestDriver::appendFile($subLogAgain,$logfile);
+        }
+    );
+}
+
 foreach my $arg (@ARGV) {
-    print $log "INFO: $0 at ".__LINE__." : Loading configuration file $arg\n";
-	my $cfg = readCfg($arg);
-	# Copy contents of global config file into hash.
-	foreach(keys(%$globalCfg)) {
-		next if $_ eq 'file';
-		$cfg->{$_} = $globalCfg->{$_}; # foreach(keys(%$globalCfg));
-		print $log "\nINFO $0: $_=".$cfg->{$_};
-	}
-	print $log "\n"; 
+    my $cfg = readCfg($arg);
 
-	my $driver = TestDriverFactory::getTestDriver($cfg);
-        die "FATAL: $0: Driver does not exist\n" if ( !$driver );
-	$driver->run(\@testgroups, \@testMatches, $cfg, $log, $dbh, \%testStatuses, $arg, $startat, $logfile);
+    my $subLog;
+    my $subLogName;
+    # basename of the .conf file (like "cmdline.conf")
+    # which is unique identifier for this loop body:
+    my $jobId = basename($arg);
+    $cfg->{'job-id'} = $jobId;
+    if ($forkFactor > 1) {
+        #$jobId = basename($arg); # basename of the .conf file (like "cmdline.conf")
+        $subLogName = "$logfile-$jobId";
+        open $subLog, ">$subLogName" or die "FATAL ERROR $0 at ".__LINE__." : Can't open $subLogName, $!\n";
+        # PARALLEL SECTION START: ===============================================================================
+        $pm->start($jobId) and next;
+        TestDriver::dbg("Started configuration file job \"$jobId\"\n"); 
+    } else {
+        $subLog = $log;
+        $subLogName = $logfile;
+    }
+
+    print $subLog "INFO: $0 at ".__LINE__." : Loading configuration file $arg\n";
+    # Copy contents of global config file into hash.
+    foreach(keys(%$globalCfg)) {
+        next if $_ eq 'file';
+        $cfg->{$_} = $globalCfg->{$_}; # foreach(keys(%$globalCfg));
+        print $subLog "\nINFO $0: $_=".$cfg->{$_};
+    }
+    print $subLog "\n"; 
+
+    my $driver = TestDriverFactory::getTestDriver($cfg);
+    die "FATAL: $0: Driver does not exist\n" if ( !$driver );
+
+    # eval this in a separate block to catch possible error and exit status:
+    eval 
+    {
+       $driver->run(\@testgroups, \@testMatches, $cfg, $subLog, $dbh, \%testStatuses, $arg, $startat, $subLogName);
+    };
+    my $runStatus = $@;
+    my $runExitCode = $?; # exit code of the code block above.
+    if ($runStatus) {
+       print "ERROR: driver->run() returned the following error message [$runStatus].";
+    }
+
+    if ($forkFactor > 1) {
+        TestDriver::dbg("finishing config job [$jobId].\n");
+        $subLog -> close();
+        # NB: use run() exit code as the subprocess exit code:
+        # NB: send the "testStatuses" hash object reference (which is local to this subprocess) to the parent process: 
+        $pm -> finish($runExitCode, \%testStatuses);
+        # PARALLEL SECTION END. ===============================================================================
+    }
 }
+
+if ($forkFactor > 1) {
+    TestDriver::dbg("Waiting for the subprocesses...\n");
+    $pm->wait_all_children;
+    TestDriver::dbg("All subprocesses finished.\n");
+    # NB: in case of parallel execution we must reopen the $log descriptor 
+    # because we appended to that file in pm#run_on_finish() sub:
+    open $log, ">>$logfile";
+}
+
 $dbh->endTestRun($globalCfg->{'trid'}) if ($dblog);
 
-# don't remove the space after Final results, it matters.
-TestDriver::printResults(\%testStatuses, $log, "Final results ");
-print $log  "Finished test run at " . time . "\n";
+# cleanup temporary Hadoop directories	
+if( ($groupForkFactor>1 || $forkFactor>1) 
+      && defined($globalCfg->{'hadoop.mapred.local.dir'}) 
+      && $globalCfg->{'exectype'} eq "local") {
+    TestDriver::dbg("Deleting temporary hadoop directories for local exec mode: [" . $globalCfg->{'hadoop.mapred.local.dir'} . "].\n");
+    rmtree( $globalCfg->{'hadoop.mapred.local.dir'} );
+}
+
+# don't remove the space after "Final results", it matters.
+if ($forkFactor > 1) {
+   TestDriver::printResults(\%testStatuses, $log, "Final results ", "", "");
+} else {
+   TestDriver::printResults(\%testStatuses, $log, "Final results ");
+}
+my $finishStr = "Finished test run at " . time . "\n";
+print $log $finishStr;
+TestDriver::dbg($finishStr);
 
 # If they have requested undeployment, do it now
 if ($undeploy) {

Modified: pig/branches/branch-0.11/test/e2e/pig/build.xml
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/test/e2e/pig/build.xml?rev=1404439&r1=1404438&r2=1404439&view=diff
==============================================================================
--- pig/branches/branch-0.11/test/e2e/pig/build.xml (original)
+++ pig/branches/branch-0.11/test/e2e/pig/build.xml Thu Nov  1 01:00:43 2012
@@ -245,6 +245,12 @@
   <target name="test-base" depends="property-check, udfs, tar, init-test">
     <!-- If they have not specified tests to run then null it out -->
     <property name="tests.to.run" value=""/>
+    <!-- fork (parallelization) factors for e2e tests execution. 
+	Defaults are 1, which means *no* parellelization: -->
+    <property name="fork.factor.group" value="1"/>
+    <property name="fork.factor.conf.file" value="1"/>
+    <property name="hadoop.mapred.local.dir" value="/tmp/hadoop/mapred/local"/>
+    <property name="e2e.debug" value="false"/>
 
     <exec executable="./test_harness.pl" dir="${test.location}" failonerror="true">
       <env key="HARNESS_ROOT" value="."/>
@@ -272,6 +278,11 @@
       <env key="PH_BENCHMARK_CACHE_PATH" value="${PH_BENCHMARK_CACHE_PATH}"/>
       <env key="HCAT_BIN" value="${hcat.bin}"/>
       <env key="PERL5LIB" value="${harness.PERL5LIB}"/>
+      <env key="FORK_FACTOR_GROUP" value="${fork.factor.group}"/>
+      <env key="FORK_FACTOR_FILE" value="${fork.factor.conf.file}"/>
+      <env key="HADOOP_MAPRED_LOCAL_DIR" value="${hadoop.mapred.local.dir}"/>
+      <env key="E2E_DEBUG" value="${e2e.debug}"/>
+
       <arg line="${tests.to.run}"/>
       <arg value="${test.location}/tests/cmdline.conf"/>
       <arg value="${test.location}/tests/multiquery.conf"/>

Modified: pig/branches/branch-0.11/test/e2e/pig/conf/local.conf
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/test/e2e/pig/conf/local.conf?rev=1404439&r1=1404438&r2=1404439&view=diff
==============================================================================
--- pig/branches/branch-0.11/test/e2e/pig/conf/local.conf (original)
+++ pig/branches/branch-0.11/test/e2e/pig/conf/local.conf Thu Nov  1 01:00:43 2012
@@ -58,6 +58,8 @@ $cfg = {
 
     , 'userhomePath' => "$ENV{HOME}"
     ,'local.bin'     => '/usr/bin'
+    # NB: we provide the default there as a fallback in case if there is no corresponding ENV key:
+    ,'hadoop.mapred.local.dir'     => defined $ENV{'HADOOP_MAPRED_LOCAL_DIR'} ? $ENV{'HADOOP_MAPRED_LOCAL_DIR'} : "/tmp/hadoop/mapred/local"
  
     ,'logDir'                => "$ENV{PH_OUT}/log" 
 	,'propertiesFile'     => "./conf/testpropertiesfile.conf"

Modified: pig/branches/branch-0.11/test/e2e/pig/deployers/ExistingClusterDeployer.pm
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/test/e2e/pig/deployers/ExistingClusterDeployer.pm?rev=1404439&r1=1404438&r2=1404439&view=diff
==============================================================================
--- pig/branches/branch-0.11/test/e2e/pig/deployers/ExistingClusterDeployer.pm (original)
+++ pig/branches/branch-0.11/test/e2e/pig/deployers/ExistingClusterDeployer.pm Thu Nov  1 01:00:43 2012
@@ -68,15 +68,15 @@ sub checkPrerequisites
     # They must have declared the conf directory for their Hadoop installation
     if (! defined $cfg->{'hadoopconfdir'} || $cfg->{'hadoopconfdir'} eq "") {
         print $log "You must set the key 'hadoopconfdir' to your Hadoop conf directory "
-            . "in existing.conf\n";
-        die "hadoopconfdir is not set in existing.conf\n";
+            . "in existing_deployer.conf\n";
+        die "hadoopconfdir is not set in existing_deployer.conf\n";
     }
     
     # They must have declared the executable path for their Hadoop installation
     if (! defined $cfg->{'hadoopbin'} || $cfg->{'hadoopbin'} eq "") {
         print $log "You must set the key 'hadoopbin' to your Hadoop bin path"
-            . "in existing.conf\n";
-        die "hadoopbin is not set in existing.conf\n";
+            . "in existing_deployer.conf\n";
+        die "hadoopbin is not set in existing_deployer.conf\n";
     }
 
     # Run a quick and easy Hadoop command to make sure we can
@@ -340,7 +340,7 @@ sub undeploy
 #
 sub confirmUndeployment
 {
-    die "$0 INFO : confirmUndeployment is a virtual function!";
+    # TODO: implement a correct confirmation, but let's not die there.
 }
 
 # TODO

Modified: pig/branches/branch-0.11/test/e2e/pig/drivers/TestDriverPig.pm
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/test/e2e/pig/drivers/TestDriverPig.pm?rev=1404439&r1=1404438&r2=1404439&view=diff
==============================================================================
--- pig/branches/branch-0.11/test/e2e/pig/drivers/TestDriverPig.pm (original)
+++ pig/branches/branch-0.11/test/e2e/pig/drivers/TestDriverPig.pm Thu Nov  1 01:00:43 2012
@@ -104,13 +104,13 @@ sub replaceParameters
 sub globalSetup
 {
     my ($self, $globalHash, $log) = @_;
-    my $subName = (caller(0))[3];
-
 
     # Setup the output path
     my $me = `whoami`;
     chomp $me;
-    $globalHash->{'runid'} = $me . "." . time;
+    my $jobId = $globalHash->{'job-id'};
+    my $timeId = time;
+    $globalHash->{'runid'} = $me . "-" . $timeId . "-" . $jobId;
 
     # if "-ignore false" was provided on the command line,
     # it means do run tests even when marked as 'ignore'
@@ -121,51 +121,61 @@ sub globalSetup
 
     $globalHash->{'outpath'} = $globalHash->{'outpathbase'} . "/" . $globalHash->{'runid'} . "/";
     $globalHash->{'localpath'} = $globalHash->{'localpathbase'} . "/" . $globalHash->{'runid'} . "/";
+    $globalHash->{'tmpPath'} = $globalHash->{'tmpPath'} . "/" . $globalHash->{'runid'} . "/";
+}
+
+sub globalSetupConditional() {
+    my ($self, $globalHash, $log) = @_;
 
     # add libexec location to the path
     if (defined($ENV{'PATH'})) {
         $ENV{'PATH'} = $globalHash->{'scriptPath'} . ":" . $ENV{'PATH'};
-    }
-    else {
+    } else {
         $ENV{'PATH'} = $globalHash->{'scriptPath'};
     }
 
     my @cmd = ($self->getPigCmd($globalHash, $log), '-e', 'mkdir', $globalHash->{'outpath'});
-
-
 	print $log "Going to run " . join(" ", @cmd) . "\n";
-    IPC::Run::run(\@cmd, \undef, $log, $log) or die "Cannot create HDFS directory " . $globalHash->{'outpath'} . ": $? - $!\n";
+    IPC::Run::run(\@cmd, \undef, $log, $log) or die "$0 at ".__LINE__.": Cannot create HDFS directory " . $globalHash->{'outpath'} . ": $? - $!\n";
 
     IPC::Run::run(['mkdir', '-p', $globalHash->{'localpath'}], \undef, $log, $log) or 
-        die "Cannot create localpath directory " . $globalHash->{'localpath'} .
-        " " . "$ERRNO\n";
+       die "$0 at ".__LINE__.": Cannot create localpath directory [" . $globalHash->{'localpath'} .
+         "]: " . "$ERRNO\n";
 
     # Create the temporary directory
     IPC::Run::run(['mkdir', '-p', $globalHash->{'tmpPath'}], \undef, $log, $log) or 
-        die "Cannot create temporary directory " . $globalHash->{'tmpPath'} .
-        " " . "$ERRNO\n";
+       die "$0 at ".__LINE__.": Cannot create localpath directory [" . $globalHash->{'tmpPath'} .
+         "]: " . "$ERRNO\n";
 
     # Create the HDFS temporary directory
     @cmd = ($self->getPigCmd($globalHash, $log), '-e', 'mkdir', "tmp/$globalHash->{'runid'}");
 	print $log "Going to run " . join(" ", @cmd) . "\n";
-    IPC::Run::run(\@cmd, \undef, $log, $log) or die "Cannot create HDFS directory " . $globalHash->{'outpath'} . ": $? - $!\n";
+    IPC::Run::run(\@cmd, \undef, $log, $log) or die "$0 at ".__LINE__.": Cannot create HDFS directory " . "tmp/$globalHash->{'runid'}" . ": $? - $!\n";
 }
 
-sub globalCleanup
+sub globalCleanup()
 {
+    # noop there because the removal of temp directories, which are created in #globalSetupConditional(), is to be
+    # performed in method #globalCleanupConditional().
+}
+
+sub globalCleanupConditional() {
     my ($self, $globalHash, $log) = @_;
 
+    # NB: both local and HDFS output directories are not removed there, because these data may 
+    # be needed to investigate the tests failures.
+
     IPC::Run::run(['rm', '-rf', $globalHash->{'tmpPath'}], \undef, $log, $log) or 
-        warn "Cannot remove temporary directory " . $globalHash->{'tmpPath'} .
-        " " . "$ERRNO\n";
+       warn "Cannot remove temporary directory " . $globalHash->{'tmpPath'} .
+           " " . "$ERRNO\n";
 
     # Cleanup the HDFS temporary directory
     my @cmd = ($self->getPigCmd($globalHash, $log), '-e', 'fs', '-rmr', "tmp/$globalHash->{'runid'}");
-	print $log "Going to run " . join(" ", @cmd) . "\n";
-    IPC::Run::run(\@cmd, \undef, $log, $log) or die "Cannot create HDFS directory " . $globalHash->{'outpath'} . ": $? - $!\n";
+    print $log "Going to run: [" . join(" ", @cmd) . "]\n";
+    IPC::Run::run(\@cmd, \undef, $log, $log)
+       or die "$0 at ".__LINE__.": Cannot remove HDFS directory " . "tmp/$globalHash->{'runid'}" . ": $? - $!\n";
 }
 
-
 sub runTest
 {
     my ($self, $testCmd, $log) = @_;
@@ -337,6 +347,17 @@ sub runScript
     return \%result;
 }
 
+sub hadoopLocalTmpDir($$)
+{
+    my ($self, $testCmd) = @_;
+
+    if (defined($testCmd->{'hadoop.mapred.local.dir'}) 
+         && (int($ENV{'FORK_FACTOR_GROUP'})>1 || int($ENV{'FORK_FACTOR_FILE'})>1)) {
+        return $testCmd->{'hadoop.mapred.local.dir'} . "/" . $PID;
+    } else {
+        return undef;
+    }
+}
 
 sub getPigCmd($$$)
 {
@@ -359,13 +380,25 @@ sub getPigCmd($$$)
         push(@pigCmd, '-Dpig.additional.jars='.$testCmd->{'additionaljars'});
     }
 
+    my $additionalJavaParams = undef;
     if ($testCmd->{'exectype'} eq "local") {
-		push(@{$testCmd->{'java_params'}}, "-Xmx1024m");
+        $additionalJavaParams = "-Xmx1024m";
+        my $hadoopTmpDir = $self->hadoopLocalTmpDir($testCmd);
+        if (defined($hadoopTmpDir)) {
+           $additionalJavaParams .= " -Dmapred.local.dir=$hadoopTmpDir -Dmapreduce.cluster.local.dir=$hadoopTmpDir";
+        }
+        TestDriver::dbg("Additional java parameters: [$additionalJavaParams].\n");
+
         push(@pigCmd, ("-x", "local"));
     }
 
-    if (defined($testCmd->{'java_params'})) {
-		$ENV{'PIG_OPTS'} = join(" ", @{$testCmd->{'java_params'}});
+    if (defined($testCmd->{'java_params'}) || defined($additionalJavaParams)) {
+        if (defined($testCmd->{'java_params'})) {
+	   $ENV{'PIG_OPTS'} = join(" ", @{$testCmd->{'java_params'}}, $additionalJavaParams);
+        } else {
+           $ENV{'PIG_OPTS'} = $additionalJavaParams;
+        }
+        TestDriver::dbg("PIG_OPTS set to be: [$ENV{'PIG_OPTS'}].\n");
     } else {
         $ENV{'PIG_OPTS'} = undef;
     }