You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by sm...@apache.org on 2012/10/24 05:24:39 UTC

svn commit: r1401530 - in /pig/trunk: CHANGES.txt build.xml ivy.xml ivy/libraries.properties test/org/apache/pig/test/TestHBaseStorage.java test/org/apache/pig/test/TestJobSubmission.java

Author: sms
Date: Wed Oct 24 03:24:39 2012
New Revision: 1401530

URL: http://svn.apache.org/viewvc?rev=1401530&view=rev
Log:
PIG-2885: TestJobSumission and TestHBaseStorage don't work with HBase 0.94 and ZK 3.4.3 (cheolsoo via sms)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/build.xml
    pig/trunk/ivy.xml
    pig/trunk/ivy/libraries.properties
    pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java
    pig/trunk/test/org/apache/pig/test/TestJobSubmission.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1401530&r1=1401529&r2=1401530&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Oct 24 03:24:39 2012
@@ -38,6 +38,8 @@ OPTIMIZATIONS
 
 BUG FIXES
 
+PIG-2885: TestJobSumission and TestHBaseStorage don't work with HBase 0.94 and ZK 3.4.3 (cheolsoo via sms)
+
 PIG-2928: Fix e2e test failures in trunk: FilterBoolean_23/24 (cheolsoo via dvryaboy)
 
 Release 0.11.0 (unreleased)

Modified: pig/trunk/build.xml
URL: http://svn.apache.org/viewvc/pig/trunk/build.xml?rev=1401530&r1=1401529&r2=1401530&view=diff
==============================================================================
--- pig/trunk/build.xml (original)
+++ pig/trunk/build.xml Wed Oct 24 03:24:39 2012
@@ -688,13 +688,12 @@
                 <include name="jackson-core-asl-${jackson.version}.jar" />
                 <include name="joda-time-${joda-time.version}.jar" />
                 <include name="guava-${guava.version}.jar" />
+                <include name="protobuf-java-${protobuf-java.version}.jar" />
                 <include name="automaton-${automaton.version}.jar" />
                 <include name="avro-${avro.version}.jar" />
-                <include name="hbase-${hbase.version}.jar" />
                 <include name="commons*.jar" />
                 <include name="log4j*.jar" />
                 <include name="jsp-api*.jar" />
-                <include name="zookeeper*.jar" />
                 <include name="jansi-${jansi.version}.jar" />
             </zipgroupfileset>
             <fileset file="${basedir}/src/pig-default.properties" />

Modified: pig/trunk/ivy.xml
URL: http://svn.apache.org/viewvc/pig/trunk/ivy.xml?rev=1401530&r1=1401529&r2=1401530&view=diff
==============================================================================
--- pig/trunk/ivy.xml (original)
+++ pig/trunk/ivy.xml Wed Oct 24 03:24:39 2012
@@ -172,7 +172,7 @@
       conf="compile->master;test->master"/>
     <dependency org="commons-cli" name="commons-cli" rev="${commons-cli.version}"
       conf="compile->master;checkstyle->master"/>
-    
+
     <dependency org="org.apache.avro" name="avro" rev="${avro.version}"
       conf="compile->default;checkstyle->master"/>
     <dependency org="com.googlecode.json-simple" name="json-simple" rev="${json-simple.version}"
@@ -183,7 +183,7 @@
     <dependency org="xalan" name="xalan" rev="${xalan.version}"
       conf="test->default"/>
     <dependency org="xerces" name="xercesImpl" rev="${xerces.version}"
-	    conf="jdiff->default;test->default"/>
+      conf="jdiff->default;test->default"/>
     <dependency org="com.jcraft" name="jsch" rev="${jsch.version}"
       conf="compile->master"/>
     <dependency org="jline" name="jline" rev="${jline.version}"
@@ -191,7 +191,7 @@
     <dependency org="net.java.dev.javacc" name="javacc" rev="${javacc.version}"
       conf="compile->master"/>
     <dependency org="junit" name="junit" rev="${junit.version}"
-	    conf="test->master"/>
+      conf="test->master"/>
     <dependency org="com.google.code.p.arat" name="rat-lib" rev="${rats-lib.version}"
       conf="releaseaudit->default"/>
     <dependency org="org.codehaus.groovy" name="groovy-all" rev="${groovy.version}"
@@ -225,20 +225,29 @@
       <exclude org="org.apache.ant" module="ant" />
       <exclude org="org.slf4j" module="slf4j"/>
       <exclude org="org.slf4j" module="slf4j-api"/>
-      <exclude org="org.slf4j" module="log4j12"/>
       <exclude org="org.slf4j" module="slf4j-log4j12" />
+      <exclude org="org.slf4j" module="log4j12"/>
+      <exclude org="org.slf4j" module="log4j-over-slf4j"/>
       <exclude org="stax" module="stax-api" />
       <exclude org="javax.xml.bind" module="jaxb-api" />
       <exclude org="javax.ws.rs" module="jsr311-api" />
-      <exclude org="com.google.protobuf" module="protobuf-java"/>
       <exclude org="tomcat" module="jasper-runtime"/>
       <exclude org="tomcat" module="jasper-compiler"/>
-      <exclude org="org.slf4j" module="log4j-over-slf4j"/>
+      <exclude org="com.google.protobuf" module="protobuf-java"/>
       <exclude org="com.sun.jersey" module="jersey-core"/>
       <exclude org="com.sun.jersey" module="jersey-server"/>
       <exclude org="com.sun.jersey" module="jersey-json"/>
       <exclude org="asm" module="asm"/>
     </dependency>
+
+    <!-- for TestHBaseStorage -->
+    <dependency org="com.github.stephenc.high-scale-lib" name="high-scale-lib" rev="${high-scale-lib.version}"
+       conf="test->default"/>
+    <dependency org="com.google.protobuf" name="protobuf-java" rev="${protobuf-java.version}"
+       conf="test->default"/>
+    <dependency org="com.yammer.metrics" name="metrics-core" rev="${metrics-core.version}"
+       conf="test->default"/>
+
     <!-- for piggybank -->
     <dependency org="hsqldb" name="hsqldb" rev="${hsqldb.version}"
       conf="test->default" />
@@ -248,7 +257,8 @@
     <dependency org="org.vafer" name="jdeb" rev="${jdeb.version}"
       conf="compile->master">
     </dependency>
-    
+
     <dependency org="org.mockito" name="mockito-all" rev="${mockito.version}" conf="compile->default"/>
     </dependencies>
 </ivy-module>
+

Modified: pig/trunk/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/pig/trunk/ivy/libraries.properties?rev=1401530&r1=1401529&r2=1401530&view=diff
==============================================================================
--- pig/trunk/ivy/libraries.properties (original)
+++ pig/trunk/ivy/libraries.properties Wed Oct 24 03:24:39 2012
@@ -40,7 +40,7 @@ hadoop-test.version=1.0.0
 hadoop-common.version=2.0.0-alpha
 hadoop-hdfs.version=2.0.0-alpha
 hadoop-mapreduce.version=2.0.0-alpha
-hbase.version=0.90.0
+hbase.version=0.94.1
 hsqldb.version=1.8.0.10
 hive.version=0.8.0
 httpcomponents.version=4.1
@@ -71,13 +71,15 @@ slf4j-log4j12.version=1.6.1
 xerces.version=2.10.0
 xalan.version=2.7.1
 wagon-http.version=1.0-beta-2
-zookeeper.version=3.3.3
+zookeeper.version=3.4.4
 servlet.version=4.0.6
 servlet-api.version=2.5
 protobuf-java.version=2.4.0a
+high-scale-lib.version=1.1.1
+metrics-core.version=2.1.2
 guice.version=3.0
 guice-servlet.version=3.0
 aopalliance.version=1.0
 jsr311-api.version=1.1.1
 mockito.version=1.8.4
-jansi.version=1.9
\ No newline at end of file
+jansi.version=1.9

Modified: pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java?rev=1401530&r1=1401529&r2=1401530&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java Wed Oct 24 03:24:39 2012
@@ -5,9 +5,9 @@
  * licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@@ -57,7 +57,7 @@ public class TestHBaseStorage {
     private static final Log LOG = LogFactory.getLog(TestHBaseStorage.class);
     private static HBaseTestingUtility util;
     private static Configuration conf;
-    private static MiniCluster cluster; 
+    private static MiniCluster cluster;
     private static PigServer pig;
 
     final static int NUM_REGIONSERVERS = 1;
@@ -122,7 +122,7 @@ public class TestHBaseStorage {
     // DVR: I've found that it is faster to delete all rows in small tables
     // than to drop them.
     private void deleteAllRows(String tableName) throws Exception {
-        HTable table = new HTable(tableName);
+        HTable table = new HTable(conf, tableName);
         ResultScanner scanner = table.getScanner(new Scan());
         List<Delete> deletes = Lists.newArrayList();
         for (Result row : scanner) {
@@ -191,23 +191,23 @@ public class TestHBaseStorage {
     @Test
     public void testLoadWithSpecifiedTimestampAndRanges() throws IOException {
         long beforeTimeStamp = System.currentTimeMillis() - 10;
-        
+
         HTable table = prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);
 
         long afterTimeStamp = System.currentTimeMillis() + 10;
-        
+
         Assert.assertEquals("MaxTimestamp is set before rows added", 0, queryWithTimestamp(null , beforeTimeStamp, null));
-        
+
         Assert.assertEquals("MaxTimestamp is set after rows added", TEST_ROW_COUNT, queryWithTimestamp(null, afterTimeStamp, null));
-        
+
         Assert.assertEquals("MinTimestamp is set after rows added", 0, queryWithTimestamp(afterTimeStamp, null, null));
-        
+
         Assert.assertEquals("MinTimestamp is set before rows added", TEST_ROW_COUNT, queryWithTimestamp(beforeTimeStamp, null, null));
-        
+
         Assert.assertEquals("Timestamp range is set around rows added", TEST_ROW_COUNT, queryWithTimestamp(beforeTimeStamp, afterTimeStamp, null));
-        
+
         Assert.assertEquals("Timestamp range is set after rows added", 0, queryWithTimestamp(afterTimeStamp, afterTimeStamp + 10, null));
-        
+
         Assert.assertEquals("Timestamp range is set before rows added", 0, queryWithTimestamp(beforeTimeStamp - 10, beforeTimeStamp, null));
 
         Assert.assertEquals("Timestamp is set before rows added", 0, queryWithTimestamp(null, null, beforeTimeStamp));
@@ -215,22 +215,22 @@ public class TestHBaseStorage {
         Assert.assertEquals("Timestamp is set after rows added", 0, queryWithTimestamp(null, null, afterTimeStamp));
 
         long specifiedTimestamp = table.get(new Get(Bytes.toBytes("00"))).getColumnLatest(COLUMNFAMILY, Bytes.toBytes("col_a")).getTimestamp();
-        
+
         Assert.assertTrue("Timestamp is set equals to row 01", queryWithTimestamp(null, null, specifiedTimestamp) > 0);
-         
-        
+
+
         LOG.info("LoadFromHBase done");
     }
 
     private int queryWithTimestamp(Long minTimestamp, Long maxTimestamp, Long timestamp) throws IOException,
             ExecException {
-        
+
         StringBuilder extraParams = new StringBuilder();
-        
+
         if (minTimestamp != null){
             extraParams.append(" -minTimestamp " + minTimestamp + " ");
         }
-        
+
         if (maxTimestamp != null){
             extraParams.append(" -maxTimestamp " + maxTimestamp + " ");
         }
@@ -238,8 +238,8 @@ public class TestHBaseStorage {
         if (timestamp != null){
             extraParams.append(" -timestamp " + timestamp + " ");
         }
-        
-        
+
+
         pig.registerQuery("a = load 'hbase://"
                 + TESTTABLE_1
                 + "' using "
@@ -396,7 +396,7 @@ public class TestHBaseStorage {
         Assert.assertEquals(TEST_ROW_COUNT, count);
         LOG.info("LoadFromHBase done");
     }
-  
+
     /**
      *     * Test Load from hbase with map parameters and with a
      *     static column
@@ -438,7 +438,7 @@ public class TestHBaseStorage {
 
     /**
      * load from hbase test
-     * 
+     *
      * @throws IOException
      */
     @Test
@@ -473,7 +473,7 @@ public class TestHBaseStorage {
 
     /**
      * load from hbase test without hbase:// prefix
-     * 
+     *
      * @throws IOException
      */
     @Test
@@ -503,7 +503,7 @@ public class TestHBaseStorage {
 
     /**
      * load from hbase test including the row key as the first column
-     * 
+     *
      * @throws IOException
      */
     @Test
@@ -537,7 +537,7 @@ public class TestHBaseStorage {
 
     /**
      * Test Load from hbase with parameters lte and gte (01<=key<=98)
-     * 
+     *
      */
     @Test
     public void testLoadWithParameters_1() throws IOException {
@@ -732,7 +732,7 @@ public class TestHBaseStorage {
     /**
      * load from hbase 'TESTTABLE_1' using HBaseBinary format, and store it into
      * 'TESTTABLE_2' using HBaseBinaryFormat
-     * 
+     *
      * @throws IOException
      */
     @Test
@@ -745,7 +745,7 @@ public class TestHBaseStorage {
                 "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
                 + TESTCOLUMN_A + " " + TESTCOLUMN_B + " "
                 + TESTCOLUMN_C + "','-caster HBaseBinaryConverter')");
-        HTable table = new HTable(TESTTABLE_2);
+        HTable table = new HTable(conf, TESTTABLE_2);
         ResultScanner scanner = table.getScanner(new Scan());
         Iterator<Result> iter = scanner.iterator();
         int i = 0;
@@ -783,7 +783,7 @@ public class TestHBaseStorage {
                 + TESTCOLUMN_A + " " + TESTCOLUMN_B +
                 "','-caster HBaseBinaryConverter')");
 
-        HTable table = new HTable(TESTTABLE_2);
+        HTable table = new HTable(conf, TESTTABLE_2);
         ResultScanner scanner = table.getScanner(new Scan());
         Iterator<Result> iter = scanner.iterator();
         int i = 0;
@@ -804,7 +804,7 @@ public class TestHBaseStorage {
     /**
      * load from hbase 'TESTTABLE_1' using HBaseBinary format, and store it into
      * 'TESTTABLE_2' using UTF-8 Plain Text format
-     * 
+     *
      * @throws IOException
      */
     @Test
@@ -817,7 +817,7 @@ public class TestHBaseStorage {
                 + TESTCOLUMN_A + " " + TESTCOLUMN_B + " "
                 + TESTCOLUMN_C + "')");
 
-        HTable table = new HTable(TESTTABLE_2);
+        HTable table = new HTable(conf, TESTTABLE_2);
         ResultScanner scanner = table.getScanner(new Scan());
         Iterator<Result> iter = scanner.iterator();
         int i = 0;
@@ -881,7 +881,7 @@ public class TestHBaseStorage {
                 "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
                 + TESTCOLUMN_A + " " + TESTCOLUMN_B + "')");
 
-        HTable table = new HTable(TESTTABLE_2);
+        HTable table = new HTable(conf, TESTTABLE_2);
         ResultScanner scanner = table.getScanner(new Scan());
         Iterator<Result> iter = scanner.iterator();
         int i = 0;
@@ -915,7 +915,7 @@ public class TestHBaseStorage {
                 "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
                 + TESTCOLUMN_A + " " + TESTCOLUMN_B + "')");
 
-        HTable table = new HTable(TESTTABLE_2);
+        HTable table = new HTable(conf, TESTTABLE_2);
         ResultScanner scanner = table.getScanner(new Scan());
         Iterator<Result> iter = scanner.iterator();
         int i = 0;
@@ -994,7 +994,7 @@ public class TestHBaseStorage {
 
     /**
      * Prepare a table in hbase for testing.
-     * 
+     *
      */
     private HTable prepareTable(String tableName, boolean initData,
             DataFormat format) throws IOException {
@@ -1009,7 +1009,7 @@ public class TestHBaseStorage {
         table = util.createTable(Bytes.toBytesBinary(tableName),
                 COLUMNFAMILY);
         } catch (Exception e) {
-            table = new HTable(Bytes.toBytesBinary(tableName));
+            table = new HTable(conf, Bytes.toBytesBinary(tableName));
         }
 
         if (initData) {

Modified: pig/trunk/test/org/apache/pig/test/TestJobSubmission.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java?rev=1401530&r1=1401529&r2=1401530&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestJobSubmission.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestJobSubmission.java Wed Oct 24 03:24:39 2012
@@ -26,6 +26,7 @@ import java.util.Random;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.mapred.jobcontrol.Job;
@@ -55,8 +56,8 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TestJobSubmission {
-    
-    
+
+
     static PigContext pc;
     String ldFile;
     String expFile;
@@ -69,7 +70,7 @@ public class TestJobSubmission {
     String inpDir;
     String golDir;
     static MiniCluster cluster = MiniCluster.buildCluster();
-    
+
     @BeforeClass
     public static void onetimeSetUp() throws Exception {
         pc = new PigContext(ExecType.MAPREDUCE, cluster.getProperties());
@@ -81,11 +82,11 @@ public class TestJobSubmission {
         }
         GenPhyOp.setPc(pc);
         Util.copyFromLocalToCluster(cluster, "test/org/apache/pig/test/data/passwd", "/passwd");
-        
+
         Configuration conf = cluster.getConfiguration();
-        
+
     }
-    
+
     @Before
     public void setUp() throws Exception{
         curDir = System.getProperty("user.dir");
@@ -96,20 +97,20 @@ public class TestJobSubmission {
         if ((System.getProperty("os.name").toUpperCase().startsWith("WINDOWS")))
             golDir="/"+FileLocalizer.parseCygPath(golDir, FileLocalizer.STYLE_WINDOWS);
     }
-    
+
     @After
     public void tearDown() throws Exception {
     }
-    
+
     @AfterClass
     public static void oneTimeTearDown() throws Exception {
         cluster.shutDown();
     }
-    
+
 /*    private void generateInput(int numTuples) throws ExecException{
-        
+
         DataBag inpDb = GenRandomData.genRandSmallTupDataBag(r, numTuples, 1000);
-        
+
         POProject proj = new POProject(new OperatorKey("", r.nextLong()));
         Tuple t = new DefaultTuple();
         t.append(inpDb);
@@ -117,31 +118,31 @@ public class TestJobSubmission {
         proj.setColumn(0);
         proj.setOverloaded(true);
         proj.setResultType(DataType.TUPLE);
-        
+
         List<PhysicalOperator> inps = new ArrayList<PhysicalOperator>();
         inps.add(proj);
-        
+
         POStore str = new POStore(new OperatorKey("", r.nextLong()));
         str.setInputs(inps);
-        
+
         FileSpec fSpec = new FileSpec(ldFile, new FuncSpec(PigStorage.class.getName()));
-        
+
         str.setSFile(fSpec);
         str.setPc(pc);
         str.store();
     }
-    
+
     private void setUp1(boolean gen) throws Exception {
         ldFile = "file:" + inpDir + "jsTst1.txt";
         expFile = ldFile;
         stFile = "jsTst1";
         grpName = "jobSubTst1";
-        
+
         if(gen){
             generateInput(100);
             return;
         }
-        
+
         hadoopLdFile = FileLocalizer.hadoopify(ldFile, pc);
 
         FileSpec LFSpec = new FileSpec(hadoopLdFile,new FuncSpec(PigStorage.class.getName()));
@@ -153,7 +154,7 @@ public class TestJobSubmission {
         ld.setLFile(LFSpec);
         st.setPc(pc);
         st.setSFile(SFSpec);
-        
+
         php.add(ld);
         php.add(st);
         php.connect(ld, st);
@@ -164,31 +165,31 @@ public class TestJobSubmission {
         boolean gen = false;
 
         setUp1(gen);
-        
+
         if(gen)
             return;
 
         submit();
-        
+
         assertEquals(true, FileLocalizer.fileExists(stFile, pc));
-        
+
         FileSpec fSpecExp = new FileSpec(expFile, new FuncSpec(PigStorage.class.getName()));
         FileSpec fSpecAct = new FileSpec(stFile, new FuncSpec(PigStorage.class.getName()));
-        
+
         assertEquals(true, TestHelper.areFilesSame(fSpecExp, fSpecAct, pc));
     }
-    
+
     private void setUp2(boolean gen) throws Exception {
         ldFile = "file:" + inpDir + "jsTst2.txt";
         expFile = ldFile;
         stFile = "jsTst2";
         grpName = "jobSubTst2";
-        
+
         if(gen){
             generateInput(1000);
             return;
         }
-        
+
         hadoopLdFile = FileLocalizer.hadoopify(ldFile, pc);
 
         FileSpec LFSpec = new FileSpec(hadoopLdFile, new FuncSpec(PigStorage.class.getName()));
@@ -200,7 +201,7 @@ public class TestJobSubmission {
         ld.setLFile(LFSpec);
         st.setPc(pc);
         st.setSFile(SFSpec);
-        
+
         php.add(ld);
         php.add(st);
         php.connect(ld, st);
@@ -211,31 +212,31 @@ public class TestJobSubmission {
         boolean gen = false;
 
         setUp2(gen);
-        
+
         if(gen)
             return;
 
         submit();
-        
+
         assertEquals(true, FileLocalizer.fileExists(stFile, pc));
-        
+
         FileSpec fSpecExp = new FileSpec(expFile,new FuncSpec(PigStorage.class.getName()));
         FileSpec fSpecAct = new FileSpec(stFile,new FuncSpec(PigStorage.class.getName()));
-        
+
         assertEquals(true, TestHelper.areFilesSame(fSpecExp, fSpecAct, pc));
     }
-    
+
     private void setUp3(boolean gen) throws Exception {
         ldFile = "file:" + inpDir + "jsTst1.txt";
         expFile = "file:" + golDir + "jsTst3";
         stFile = "jsTst3";
         grpName = "jobSubTst3";
-        
+
         if(gen){
             generateInput(1000);
             return;
         }
-        
+
         hadoopLdFile = FileLocalizer.hadoopify(ldFile, pc);
 
         FileSpec LFSpec = new FileSpec(hadoopLdFile, new FuncSpec(PigStorage.class.getName()));
@@ -247,23 +248,23 @@ public class TestJobSubmission {
         ld.setLFile(LFSpec);
         st.setPc(pc);
         st.setSFile(SFSpec);
-        
+
         int[] flds = {0,1};
         Tuple sample = new DefaultTuple();
         sample.append(new String("S"));
         sample.append(new Integer("10"));
-        
+
         POForEach fe = GenPhyOp.topForEachOPWithPlan(flds , sample);
-        
+
         POFilter fl = GenPhyOp.topFilterOpWithProj(1, 500, GenPhyOp.LT);
-        
+
         php.add(ld);
         php.add(fe);
         php.connect(ld, fe);
-        
+
         php.add(fl);
         php.connect(fe, fl);
-        
+
         php.add(st);
         php.connect(fl, st);
      }
@@ -273,31 +274,31 @@ public class TestJobSubmission {
         boolean gen = false;
 
         setUp3(gen);
-        
+
         if(gen)
             return;
 
         submit();
-        
+
         assertEquals(true, FileLocalizer.fileExists(stFile, pc));
-        
+
         FileSpec fSpecExp = new FileSpec(expFile, new FuncSpec(PigStorage.class.getName(), new String[]{","}));
         FileSpec fSpecAct = new FileSpec(stFile,new FuncSpec(PigStorage.class.getName()));
-        
+
         assertEquals(true, TestHelper.areFilesSame(fSpecExp, fSpecAct, pc));
     }
-    
+
     private void setUp4(boolean gen) throws Exception {
         ldFile = "file:" + inpDir + "jsTst1.txt";
         expFile = "file:" + golDir + "jsTst4";
         stFile = "jsTst4";
         grpName = "jobSubTst4";
-        
+
         if(gen){
             generateInput(1000);
             return;
         }
-        
+
         hadoopLdFile = FileLocalizer.hadoopify(ldFile, pc);
 
         FileSpec LFSpec = new FileSpec(hadoopLdFile,new FuncSpec(PigStorage.class.getName()));
@@ -309,27 +310,27 @@ public class TestJobSubmission {
         ld.setLFile(LFSpec);
         st.setPc(pc);
         st.setSFile(SFSpec);
-        
+
         POSplit spl = GenPhyOp.topSplitOp();
         POFilter fl1 = GenPhyOp.topFilterOpWithProjWithCast(1, 200, GenPhyOp.LT);
         POFilter fl2 = GenPhyOp.topFilterOpWithProjWithCast(1, 800, GenPhyOp.GT);
-        
+
         POUnion un = GenPhyOp.topUnionOp();
-        
+
         php.add(ld);
         php.add(spl);
         php.connect(ld, spl);
-        
+
         php.add(fl1);
         php.connect(spl, fl1);
-        
+
         php.add(fl2);
         php.connect(spl, fl2);
-        
+
         php.add(un);
         php.connect(fl1, un);
         php.connect(fl2, un);
-        
+
         php.add(st);
         php.connect(un, st);
      }
@@ -339,32 +340,32 @@ public class TestJobSubmission {
         boolean gen = false;
 
         setUp4(gen);
-        
+
         if(gen)
             return;
-        
+
         submit();
-        
+
         assertEquals(true, FileLocalizer.fileExists(stFile, pc));
-        
+
         FileSpec fSpecExp = new FileSpec(expFile, new FuncSpec(PigStorage.class.getName(), new String[]{","}));
         FileSpec fSpecAct = new FileSpec(stFile,new FuncSpec(PigStorage.class.getName()));
-        
+
         assertEquals(true, TestHelper.areFilesSame(fSpecExp, fSpecAct, pc));
-        
+
     }
-    
+
     private void setUp5(boolean gen) throws Exception {
         ldFile = "file:" + inpDir + "jsTst5.txt";
         expFile = ldFile;
         stFile = "jsTst5";
         grpName = "jobSubTst5";
-        
+
         if(gen){
             generateInput(1000);
             return;
         }
-        
+
         hadoopLdFile = FileLocalizer.hadoopify(ldFile, pc);
 
         FileSpec LFSpec = new FileSpec(hadoopLdFile, new FuncSpec(PigStorage.class.getName(), new String[]{","}));
@@ -376,35 +377,35 @@ public class TestJobSubmission {
         ld.setLFile(LFSpec);
         st.setPc(pc);
         st.setSFile(SFSpec);
-        
+
         Tuple sample = new DefaultTuple();
         sample.append("S");
         sample.append(1);
         POLocalRearrange lr = GenPhyOp.topLocalRearrangeOPWithPlan(0, 1, sample);
-        
+
         POGlobalRearrange gr = GenPhyOp.topGlobalRearrangeOp();
-        
+
         POPackage pk = GenPhyOp.topPackageOp();
         pk.setKeyType(DataType.INTEGER);
         pk.setNumInps(1);
-        boolean[] inner = {false}; 
+        boolean[] inner = {false};
         pk.setInner(inner);
-        
+
         POForEach fe = GenPhyOp.topForEachOPWithPlan(1);
-        
+
         php.add(ld);
         php.add(lr);
         php.connect(ld, lr);
-        
+
         php.add(gr);
         php.connect(lr, gr);
-        
+
         php.add(pk);
         php.connect(gr, pk);
-        
+
         php.add(fe);
         php.connect(pk, fe);
-        
+
         php.add(st);
         php.connect(fe, st);
      }
@@ -414,54 +415,54 @@ public class TestJobSubmission {
         boolean gen = false;
 
         setUp5(gen);
-        
+
         if(gen)
             return;
-        
+
         submit();
-        
+
         assertEquals(true, FileLocalizer.fileExists(stFile, pc));
-        
+
         FileSpec fSpecExp = new FileSpec(expFile, new FuncSpec(PigStorage.class.getName(), new String[]{","}));
         FileSpec fSpecAct = new FileSpec(stFile,new FuncSpec(PigStorage.class.getName()));
-        
+
         assertEquals(true, TestHelper.areFilesSame(fSpecExp, fSpecAct, pc));
-        
+
     }*/
-    
+
     @Test
     public void testJobControlCompilerErr() throws Exception {
-    	String query = "a = load 'input';" + "b = order a by $0;" + "store b into 'output';";
-    	PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
-    	PhysicalPlan pp = Util.buildPp(pigServer, query);
-    	POStore store = GenPhyOp.dummyPigStorageOp();
-    	pp.addAsLeaf(store);
-    	MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
-    	
-    	for(MapReduceOper mro: mrPlan.getLeaves()) {
-    		if(mro.reducePlan != null) {
-    			PhysicalOperator po = mro.reducePlan.getRoots().get(0);
-    			if(po instanceof POPackage) {
-    				((POPackage)po).setKeyType(DataType.BAG);
-    				mro.setGlobalSort(true);
-    			}
-    		}
-    	}
-    	
+        String query = "a = load 'input';" + "b = order a by $0;" + "store b into 'output';";
+        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        PhysicalPlan pp = Util.buildPp(pigServer, query);
+        POStore store = GenPhyOp.dummyPigStorageOp();
+        pp.addAsLeaf(store);
+        MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+        
+        for(MapReduceOper mro: mrPlan.getLeaves()) {
+            if(mro.reducePlan != null) {
+                PhysicalOperator po = mro.reducePlan.getRoots().get(0);
+                if(po instanceof POPackage) {
+                    ((POPackage)po).setKeyType(DataType.BAG);
+                    mro.setGlobalSort(true);
+                }
+            }
+        }
+        
         ConfigurationValidator.validatePigProperties(pc.getProperties());
         Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
         JobControlCompiler jcc = new JobControlCompiler(pc, conf);
         try {
-        	jcc.compile(mrPlan, "Test");
+            jcc.compile(mrPlan, "Test");
         } catch (JobCreationException jce) {
             assertTrue(jce.getErrorCode() == 1068);
         }
     }
-    
+
     @Test
     public void testDefaultParallel() throws Throwable {
         pc.defaultParallel = 100;
-        
+
         String query = "a = load 'input';" + "b = group a by $0;" + "store b into 'output';";
         PigServer ps = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
         PhysicalPlan pp = Util.buildPp(ps, query);
@@ -470,7 +471,7 @@ public class TestJobSubmission {
         ConfigurationValidator.validatePigProperties(pc.getProperties());
         Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
         JobControlCompiler jcc = new JobControlCompiler(pc, conf);
-        
+
         JobControl jobControl = jcc.compile(mrPlan, "Test");
         Job job = jobControl.getWaitingJobs().get(0);
         int parallel = job.getJobConf().getNumReduceTasks();
@@ -478,7 +479,7 @@ public class TestJobSubmission {
         assertEquals(100, parallel);
         Util.assertParallelValues(100, -1, -1, 100, job.getJobConf());
 
-        pc.defaultParallel = -1;        
+        pc.defaultParallel = -1;
     }
 
     @Test
@@ -503,21 +504,21 @@ public class TestJobSubmission {
         }
         assertEquals(3, counter);
 
-        pc.defaultParallel = -1;        
+        pc.defaultParallel = -1;
     }
-    
+
     @Test
     public void testDefaultParallelInSkewJoin() throws Throwable {
         // default_parallel is considered only at runtime, so here we only test requested parallel
         // more thorough tests can be found in TestNumberOfReducers.java
-        String query = "a = load 'input';" + 
-                       "b = load 'input';" + 
+        String query = "a = load 'input';" +
+                       "b = load 'input';" +
                        "c = join a by $0, b by $0 using 'skewed' parallel 100;" +
                        "store c into 'output';";
         PigServer ps = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
         PhysicalPlan pp = Util.buildPp(ps, query);
         MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
-        
+
         // Get the skew join job
         Iterator<MapReduceOper> iter = mrPlan.getKeys().values().iterator();
         int counter = 0;
@@ -529,8 +530,8 @@ public class TestJobSubmission {
             }
         }
         assertEquals(3, counter);
-        
-        pc.defaultParallel = -1;        
+
+        pc.defaultParallel = -1;
     }
 
     @Test
@@ -541,18 +542,19 @@ public class TestJobSubmission {
         // use the estimation
         Configuration conf = cluster.getConfiguration();
         HBaseTestingUtility util = new HBaseTestingUtility(conf);
-        util.startMiniZKCluster();
+        int clientPort = util.startMiniZKCluster().getClientPort();
         util.startMiniHBaseCluster(1, 1);
-        
-        String query = "a = load '/passwd';" + 
+
+        String query = "a = load '/passwd';" +
                        "b = group a by $0;" +
                        "store b into 'output';";
         PigServer ps = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
         PhysicalPlan pp = Util.buildPp(ps, query);
         MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
-               
+
         pc.getConf().setProperty("pig.exec.reducers.bytes.per.reducer", "100");
         pc.getConf().setProperty("pig.exec.reducers.max", "10");
+        pc.getConf().setProperty(HConstants.ZOOKEEPER_CLIENT_PORT, Integer.toString(clientPort));
         ConfigurationValidator.validatePigProperties(pc.getProperties());
         conf = ConfigurationUtil.toConfiguration(pc.getProperties());
         JobControlCompiler jcc = new JobControlCompiler(pc, conf);
@@ -568,7 +570,7 @@ public class TestJobSubmission {
                 "store b into 'output';";
         pp = Util.buildPp(ps, query);
         mrPlan = Util.buildMRPlan(pp, pc);
-               
+
         pc.getConf().setProperty("pig.exec.reducers.bytes.per.reducer", "100");
         pc.getConf().setProperty("pig.exec.reducers.max", "10");
         ConfigurationValidator.validatePigProperties(pc.getProperties());
@@ -581,14 +583,14 @@ public class TestJobSubmission {
 
         final byte[] COLUMNFAMILY = Bytes.toBytes("pig");
         util.createTable(Bytes.toBytesBinary("test_table"), COLUMNFAMILY);
-        
+
         // the estimation won't take effect when it apply to non-dfs or the files doesn't exist, such as hbase
         query = "a = load 'hbase://test_table' using org.apache.pig.backend.hadoop.hbase.HBaseStorage('c:f1 c:f2');" +
                 "b = group a by $0 ;" +
                 "store b into 'output';";
         pp = Util.buildPp(ps, query);
         mrPlan = Util.buildMRPlan(pp, pc);
-                
+
         pc.getConf().setProperty("pig.exec.reducers.bytes.per.reducer", "100");
         pc.getConf().setProperty("pig.exec.reducers.max", "10");
 
@@ -610,14 +612,14 @@ public class TestJobSubmission {
         }
         util.shutdownMiniZKCluster();
     }
-    
+
     @Test
     public void testReducerNumEstimationForOrderBy() throws Exception{
        // use the estimation
         pc.getProperties().setProperty("pig.exec.reducers.bytes.per.reducer", "100");
         pc.getProperties().setProperty("pig.exec.reducers.max", "10");
-        
-        String query = "a = load '/passwd';" + 
+
+        String query = "a = load '/passwd';" +
                        "b = order a by $0;" +
                        "store b into 'output';";
         PigServer ps = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
@@ -634,7 +636,7 @@ public class TestJobSubmission {
         Util.assertParallelValues(-1, 1, -1, 1, jobControl.getWaitingJobs().get(0).getJobConf());
 
         // Simulate the first job having run so estimation kicks in.
-        MapReduceOper sort = mrPlan.getLeaves().get(0);        
+        MapReduceOper sort = mrPlan.getLeaves().get(0);
         jcc.updateMROpPlan(jobControl.getReadyJobs());
         FileLocalizer.create(sort.getQuantFile(), pc);
         jobControl = jcc.compile(mrPlan, query);
@@ -650,39 +652,39 @@ public class TestJobSubmission {
         query = "a = load '/passwd';" + "b = order a by $0 PARALLEL 2;" +
                 "store b into 'output';";
         pp = Util.buildPp(ps, query);
-        
-        mrPlan = Util.buildMRPlanWithOptimizer(pp, pc);               
+
+        mrPlan = Util.buildMRPlanWithOptimizer(pp, pc);
 
         assertEquals(2, mrPlan.size());
-        
-        sort = mrPlan.getLeaves().get(0);        
+
+        sort = mrPlan.getLeaves().get(0);
         assertEquals(2, sort.getRequestedParallelism());
-        
+
         // the estimation won't take effect when it apply to non-dfs or the files doesn't exist, such as hbase
         query = "a = load 'hbase://passwd' using org.apache.pig.backend.hadoop.hbase.HBaseStorage('c:f1 c:f2');" +
                 "b = order a by $0 ;" +
                 "store b into 'output';";
         pp = Util.buildPp(ps, query);
- 
-        mrPlan = Util.buildMRPlanWithOptimizer(pp, pc);               
-        assertEquals(2, mrPlan.size());     
-        
+
+        mrPlan = Util.buildMRPlanWithOptimizer(pp, pc);
+        assertEquals(2, mrPlan.size());
+
         sort = mrPlan.getLeaves().get(0);
-        
+
         // the requested parallel will be -1 if users don't set any of default_parallel, paralllel
         // and the estimation doesn't take effect. MR framework will finally set it to 1.
         assertEquals(-1, sort.getRequestedParallelism());
-        
+
         // test order by with three jobs (after optimization)
         query = "a = load '/passwd';" +
                 "b = foreach a generate $0, $1, $2;" +
                 "c = order b by $0;" +
                 "store c into 'output';";
         pp = Util.buildPp(ps, query);
-        
+
         mrPlan = Util.buildMRPlanWithOptimizer(pp, pc);
-        assertEquals(3, mrPlan.size());     
-        
+        assertEquals(3, mrPlan.size());
+
         // Simulate the first 2 jobs having run so estimation kicks in.
         sort = mrPlan.getLeaves().get(0);
         FileLocalizer.create(sort.getQuantFile(), pc);
@@ -701,7 +703,7 @@ public class TestJobSubmission {
         Util.assertParallelValues(-1, 1, -1, 1, jobControl.getWaitingJobs().get(0).getJobConf());
 
         jobControl = jcc.compile(mrPlan, query);
-        sort = mrPlan.getLeaves().get(0);       
+        sort = mrPlan.getLeaves().get(0);
         assertEquals(reducer, sort.getRequestedParallelism());
 
         //Third job is the order, which uses the estimated number of reducers