You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by sm...@apache.org on 2012/10/24 05:24:39 UTC
svn commit: r1401530 - in /pig/trunk: CHANGES.txt build.xml ivy.xml
ivy/libraries.properties test/org/apache/pig/test/TestHBaseStorage.java
test/org/apache/pig/test/TestJobSubmission.java
Author: sms
Date: Wed Oct 24 03:24:39 2012
New Revision: 1401530
URL: http://svn.apache.org/viewvc?rev=1401530&view=rev
Log:
PIG-2885: TestJobSumission and TestHBaseStorage don't work with HBase 0.94 and ZK 3.4.3 (cheolsoo via sms)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/build.xml
pig/trunk/ivy.xml
pig/trunk/ivy/libraries.properties
pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java
pig/trunk/test/org/apache/pig/test/TestJobSubmission.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1401530&r1=1401529&r2=1401530&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Oct 24 03:24:39 2012
@@ -38,6 +38,8 @@ OPTIMIZATIONS
BUG FIXES
+PIG-2885: TestJobSumission and TestHBaseStorage don't work with HBase 0.94 and ZK 3.4.3 (cheolsoo via sms)
+
PIG-2928: Fix e2e test failures in trunk: FilterBoolean_23/24 (cheolsoo via dvryaboy)
Release 0.11.0 (unreleased)
Modified: pig/trunk/build.xml
URL: http://svn.apache.org/viewvc/pig/trunk/build.xml?rev=1401530&r1=1401529&r2=1401530&view=diff
==============================================================================
--- pig/trunk/build.xml (original)
+++ pig/trunk/build.xml Wed Oct 24 03:24:39 2012
@@ -688,13 +688,12 @@
<include name="jackson-core-asl-${jackson.version}.jar" />
<include name="joda-time-${joda-time.version}.jar" />
<include name="guava-${guava.version}.jar" />
+ <include name="protobuf-java-${protobuf-java.version}.jar" />
<include name="automaton-${automaton.version}.jar" />
<include name="avro-${avro.version}.jar" />
- <include name="hbase-${hbase.version}.jar" />
<include name="commons*.jar" />
<include name="log4j*.jar" />
<include name="jsp-api*.jar" />
- <include name="zookeeper*.jar" />
<include name="jansi-${jansi.version}.jar" />
</zipgroupfileset>
<fileset file="${basedir}/src/pig-default.properties" />
Modified: pig/trunk/ivy.xml
URL: http://svn.apache.org/viewvc/pig/trunk/ivy.xml?rev=1401530&r1=1401529&r2=1401530&view=diff
==============================================================================
--- pig/trunk/ivy.xml (original)
+++ pig/trunk/ivy.xml Wed Oct 24 03:24:39 2012
@@ -172,7 +172,7 @@
conf="compile->master;test->master"/>
<dependency org="commons-cli" name="commons-cli" rev="${commons-cli.version}"
conf="compile->master;checkstyle->master"/>
-
+
<dependency org="org.apache.avro" name="avro" rev="${avro.version}"
conf="compile->default;checkstyle->master"/>
<dependency org="com.googlecode.json-simple" name="json-simple" rev="${json-simple.version}"
@@ -183,7 +183,7 @@
<dependency org="xalan" name="xalan" rev="${xalan.version}"
conf="test->default"/>
<dependency org="xerces" name="xercesImpl" rev="${xerces.version}"
- conf="jdiff->default;test->default"/>
+ conf="jdiff->default;test->default"/>
<dependency org="com.jcraft" name="jsch" rev="${jsch.version}"
conf="compile->master"/>
<dependency org="jline" name="jline" rev="${jline.version}"
@@ -191,7 +191,7 @@
<dependency org="net.java.dev.javacc" name="javacc" rev="${javacc.version}"
conf="compile->master"/>
<dependency org="junit" name="junit" rev="${junit.version}"
- conf="test->master"/>
+ conf="test->master"/>
<dependency org="com.google.code.p.arat" name="rat-lib" rev="${rats-lib.version}"
conf="releaseaudit->default"/>
<dependency org="org.codehaus.groovy" name="groovy-all" rev="${groovy.version}"
@@ -225,20 +225,29 @@
<exclude org="org.apache.ant" module="ant" />
<exclude org="org.slf4j" module="slf4j"/>
<exclude org="org.slf4j" module="slf4j-api"/>
- <exclude org="org.slf4j" module="log4j12"/>
<exclude org="org.slf4j" module="slf4j-log4j12" />
+ <exclude org="org.slf4j" module="log4j12"/>
+ <exclude org="org.slf4j" module="log4j-over-slf4j"/>
<exclude org="stax" module="stax-api" />
<exclude org="javax.xml.bind" module="jaxb-api" />
<exclude org="javax.ws.rs" module="jsr311-api" />
- <exclude org="com.google.protobuf" module="protobuf-java"/>
<exclude org="tomcat" module="jasper-runtime"/>
<exclude org="tomcat" module="jasper-compiler"/>
- <exclude org="org.slf4j" module="log4j-over-slf4j"/>
+ <exclude org="com.google.protobuf" module="protobuf-java"/>
<exclude org="com.sun.jersey" module="jersey-core"/>
<exclude org="com.sun.jersey" module="jersey-server"/>
<exclude org="com.sun.jersey" module="jersey-json"/>
<exclude org="asm" module="asm"/>
</dependency>
+
+ <!-- for TestHBaseStorage -->
+ <dependency org="com.github.stephenc.high-scale-lib" name="high-scale-lib" rev="${high-scale-lib.version}"
+ conf="test->default"/>
+ <dependency org="com.google.protobuf" name="protobuf-java" rev="${protobuf-java.version}"
+ conf="test->default"/>
+ <dependency org="com.yammer.metrics" name="metrics-core" rev="${metrics-core.version}"
+ conf="test->default"/>
+
<!-- for piggybank -->
<dependency org="hsqldb" name="hsqldb" rev="${hsqldb.version}"
conf="test->default" />
@@ -248,7 +257,8 @@
<dependency org="org.vafer" name="jdeb" rev="${jdeb.version}"
conf="compile->master">
</dependency>
-
+
<dependency org="org.mockito" name="mockito-all" rev="${mockito.version}" conf="compile->default"/>
</dependencies>
</ivy-module>
+
Modified: pig/trunk/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/pig/trunk/ivy/libraries.properties?rev=1401530&r1=1401529&r2=1401530&view=diff
==============================================================================
--- pig/trunk/ivy/libraries.properties (original)
+++ pig/trunk/ivy/libraries.properties Wed Oct 24 03:24:39 2012
@@ -40,7 +40,7 @@ hadoop-test.version=1.0.0
hadoop-common.version=2.0.0-alpha
hadoop-hdfs.version=2.0.0-alpha
hadoop-mapreduce.version=2.0.0-alpha
-hbase.version=0.90.0
+hbase.version=0.94.1
hsqldb.version=1.8.0.10
hive.version=0.8.0
httpcomponents.version=4.1
@@ -71,13 +71,15 @@ slf4j-log4j12.version=1.6.1
xerces.version=2.10.0
xalan.version=2.7.1
wagon-http.version=1.0-beta-2
-zookeeper.version=3.3.3
+zookeeper.version=3.4.4
servlet.version=4.0.6
servlet-api.version=2.5
protobuf-java.version=2.4.0a
+high-scale-lib.version=1.1.1
+metrics-core.version=2.1.2
guice.version=3.0
guice-servlet.version=3.0
aopalliance.version=1.0
jsr311-api.version=1.1.1
mockito.version=1.8.4
-jansi.version=1.9
\ No newline at end of file
+jansi.version=1.9
Modified: pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java?rev=1401530&r1=1401529&r2=1401530&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java Wed Oct 24 03:24:39 2012
@@ -5,9 +5,9 @@
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@@ -57,7 +57,7 @@ public class TestHBaseStorage {
private static final Log LOG = LogFactory.getLog(TestHBaseStorage.class);
private static HBaseTestingUtility util;
private static Configuration conf;
- private static MiniCluster cluster;
+ private static MiniCluster cluster;
private static PigServer pig;
final static int NUM_REGIONSERVERS = 1;
@@ -122,7 +122,7 @@ public class TestHBaseStorage {
// DVR: I've found that it is faster to delete all rows in small tables
// than to drop them.
private void deleteAllRows(String tableName) throws Exception {
- HTable table = new HTable(tableName);
+ HTable table = new HTable(conf, tableName);
ResultScanner scanner = table.getScanner(new Scan());
List<Delete> deletes = Lists.newArrayList();
for (Result row : scanner) {
@@ -191,23 +191,23 @@ public class TestHBaseStorage {
@Test
public void testLoadWithSpecifiedTimestampAndRanges() throws IOException {
long beforeTimeStamp = System.currentTimeMillis() - 10;
-
+
HTable table = prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);
long afterTimeStamp = System.currentTimeMillis() + 10;
-
+
Assert.assertEquals("MaxTimestamp is set before rows added", 0, queryWithTimestamp(null , beforeTimeStamp, null));
-
+
Assert.assertEquals("MaxTimestamp is set after rows added", TEST_ROW_COUNT, queryWithTimestamp(null, afterTimeStamp, null));
-
+
Assert.assertEquals("MinTimestamp is set after rows added", 0, queryWithTimestamp(afterTimeStamp, null, null));
-
+
Assert.assertEquals("MinTimestamp is set before rows added", TEST_ROW_COUNT, queryWithTimestamp(beforeTimeStamp, null, null));
-
+
Assert.assertEquals("Timestamp range is set around rows added", TEST_ROW_COUNT, queryWithTimestamp(beforeTimeStamp, afterTimeStamp, null));
-
+
Assert.assertEquals("Timestamp range is set after rows added", 0, queryWithTimestamp(afterTimeStamp, afterTimeStamp + 10, null));
-
+
Assert.assertEquals("Timestamp range is set before rows added", 0, queryWithTimestamp(beforeTimeStamp - 10, beforeTimeStamp, null));
Assert.assertEquals("Timestamp is set before rows added", 0, queryWithTimestamp(null, null, beforeTimeStamp));
@@ -215,22 +215,22 @@ public class TestHBaseStorage {
Assert.assertEquals("Timestamp is set after rows added", 0, queryWithTimestamp(null, null, afterTimeStamp));
long specifiedTimestamp = table.get(new Get(Bytes.toBytes("00"))).getColumnLatest(COLUMNFAMILY, Bytes.toBytes("col_a")).getTimestamp();
-
+
Assert.assertTrue("Timestamp is set equals to row 01", queryWithTimestamp(null, null, specifiedTimestamp) > 0);
-
-
+
+
LOG.info("LoadFromHBase done");
}
private int queryWithTimestamp(Long minTimestamp, Long maxTimestamp, Long timestamp) throws IOException,
ExecException {
-
+
StringBuilder extraParams = new StringBuilder();
-
+
if (minTimestamp != null){
extraParams.append(" -minTimestamp " + minTimestamp + " ");
}
-
+
if (maxTimestamp != null){
extraParams.append(" -maxTimestamp " + maxTimestamp + " ");
}
@@ -238,8 +238,8 @@ public class TestHBaseStorage {
if (timestamp != null){
extraParams.append(" -timestamp " + timestamp + " ");
}
-
-
+
+
pig.registerQuery("a = load 'hbase://"
+ TESTTABLE_1
+ "' using "
@@ -396,7 +396,7 @@ public class TestHBaseStorage {
Assert.assertEquals(TEST_ROW_COUNT, count);
LOG.info("LoadFromHBase done");
}
-
+
/**
* * Test Load from hbase with map parameters and with a
* static column
@@ -438,7 +438,7 @@ public class TestHBaseStorage {
/**
* load from hbase test
- *
+ *
* @throws IOException
*/
@Test
@@ -473,7 +473,7 @@ public class TestHBaseStorage {
/**
* load from hbase test without hbase:// prefix
- *
+ *
* @throws IOException
*/
@Test
@@ -503,7 +503,7 @@ public class TestHBaseStorage {
/**
* load from hbase test including the row key as the first column
- *
+ *
* @throws IOException
*/
@Test
@@ -537,7 +537,7 @@ public class TestHBaseStorage {
/**
* Test Load from hbase with parameters lte and gte (01<=key<=98)
- *
+ *
*/
@Test
public void testLoadWithParameters_1() throws IOException {
@@ -732,7 +732,7 @@ public class TestHBaseStorage {
/**
* load from hbase 'TESTTABLE_1' using HBaseBinary format, and store it into
* 'TESTTABLE_2' using HBaseBinaryFormat
- *
+ *
* @throws IOException
*/
@Test
@@ -745,7 +745,7 @@ public class TestHBaseStorage {
"org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+ TESTCOLUMN_A + " " + TESTCOLUMN_B + " "
+ TESTCOLUMN_C + "','-caster HBaseBinaryConverter')");
- HTable table = new HTable(TESTTABLE_2);
+ HTable table = new HTable(conf, TESTTABLE_2);
ResultScanner scanner = table.getScanner(new Scan());
Iterator<Result> iter = scanner.iterator();
int i = 0;
@@ -783,7 +783,7 @@ public class TestHBaseStorage {
+ TESTCOLUMN_A + " " + TESTCOLUMN_B +
"','-caster HBaseBinaryConverter')");
- HTable table = new HTable(TESTTABLE_2);
+ HTable table = new HTable(conf, TESTTABLE_2);
ResultScanner scanner = table.getScanner(new Scan());
Iterator<Result> iter = scanner.iterator();
int i = 0;
@@ -804,7 +804,7 @@ public class TestHBaseStorage {
/**
* load from hbase 'TESTTABLE_1' using HBaseBinary format, and store it into
* 'TESTTABLE_2' using UTF-8 Plain Text format
- *
+ *
* @throws IOException
*/
@Test
@@ -817,7 +817,7 @@ public class TestHBaseStorage {
+ TESTCOLUMN_A + " " + TESTCOLUMN_B + " "
+ TESTCOLUMN_C + "')");
- HTable table = new HTable(TESTTABLE_2);
+ HTable table = new HTable(conf, TESTTABLE_2);
ResultScanner scanner = table.getScanner(new Scan());
Iterator<Result> iter = scanner.iterator();
int i = 0;
@@ -881,7 +881,7 @@ public class TestHBaseStorage {
"org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+ TESTCOLUMN_A + " " + TESTCOLUMN_B + "')");
- HTable table = new HTable(TESTTABLE_2);
+ HTable table = new HTable(conf, TESTTABLE_2);
ResultScanner scanner = table.getScanner(new Scan());
Iterator<Result> iter = scanner.iterator();
int i = 0;
@@ -915,7 +915,7 @@ public class TestHBaseStorage {
"org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+ TESTCOLUMN_A + " " + TESTCOLUMN_B + "')");
- HTable table = new HTable(TESTTABLE_2);
+ HTable table = new HTable(conf, TESTTABLE_2);
ResultScanner scanner = table.getScanner(new Scan());
Iterator<Result> iter = scanner.iterator();
int i = 0;
@@ -994,7 +994,7 @@ public class TestHBaseStorage {
/**
* Prepare a table in hbase for testing.
- *
+ *
*/
private HTable prepareTable(String tableName, boolean initData,
DataFormat format) throws IOException {
@@ -1009,7 +1009,7 @@ public class TestHBaseStorage {
table = util.createTable(Bytes.toBytesBinary(tableName),
COLUMNFAMILY);
} catch (Exception e) {
- table = new HTable(Bytes.toBytesBinary(tableName));
+ table = new HTable(conf, Bytes.toBytesBinary(tableName));
}
if (initData) {
Modified: pig/trunk/test/org/apache/pig/test/TestJobSubmission.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java?rev=1401530&r1=1401529&r2=1401530&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestJobSubmission.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestJobSubmission.java Wed Oct 24 03:24:39 2012
@@ -26,6 +26,7 @@ import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapred.jobcontrol.Job;
@@ -55,8 +56,8 @@ import org.junit.BeforeClass;
import org.junit.Test;
public class TestJobSubmission {
-
-
+
+
static PigContext pc;
String ldFile;
String expFile;
@@ -69,7 +70,7 @@ public class TestJobSubmission {
String inpDir;
String golDir;
static MiniCluster cluster = MiniCluster.buildCluster();
-
+
@BeforeClass
public static void onetimeSetUp() throws Exception {
pc = new PigContext(ExecType.MAPREDUCE, cluster.getProperties());
@@ -81,11 +82,11 @@ public class TestJobSubmission {
}
GenPhyOp.setPc(pc);
Util.copyFromLocalToCluster(cluster, "test/org/apache/pig/test/data/passwd", "/passwd");
-
+
Configuration conf = cluster.getConfiguration();
-
+
}
-
+
@Before
public void setUp() throws Exception{
curDir = System.getProperty("user.dir");
@@ -96,20 +97,20 @@ public class TestJobSubmission {
if ((System.getProperty("os.name").toUpperCase().startsWith("WINDOWS")))
golDir="/"+FileLocalizer.parseCygPath(golDir, FileLocalizer.STYLE_WINDOWS);
}
-
+
@After
public void tearDown() throws Exception {
}
-
+
@AfterClass
public static void oneTimeTearDown() throws Exception {
cluster.shutDown();
}
-
+
/* private void generateInput(int numTuples) throws ExecException{
-
+
DataBag inpDb = GenRandomData.genRandSmallTupDataBag(r, numTuples, 1000);
-
+
POProject proj = new POProject(new OperatorKey("", r.nextLong()));
Tuple t = new DefaultTuple();
t.append(inpDb);
@@ -117,31 +118,31 @@ public class TestJobSubmission {
proj.setColumn(0);
proj.setOverloaded(true);
proj.setResultType(DataType.TUPLE);
-
+
List<PhysicalOperator> inps = new ArrayList<PhysicalOperator>();
inps.add(proj);
-
+
POStore str = new POStore(new OperatorKey("", r.nextLong()));
str.setInputs(inps);
-
+
FileSpec fSpec = new FileSpec(ldFile, new FuncSpec(PigStorage.class.getName()));
-
+
str.setSFile(fSpec);
str.setPc(pc);
str.store();
}
-
+
private void setUp1(boolean gen) throws Exception {
ldFile = "file:" + inpDir + "jsTst1.txt";
expFile = ldFile;
stFile = "jsTst1";
grpName = "jobSubTst1";
-
+
if(gen){
generateInput(100);
return;
}
-
+
hadoopLdFile = FileLocalizer.hadoopify(ldFile, pc);
FileSpec LFSpec = new FileSpec(hadoopLdFile,new FuncSpec(PigStorage.class.getName()));
@@ -153,7 +154,7 @@ public class TestJobSubmission {
ld.setLFile(LFSpec);
st.setPc(pc);
st.setSFile(SFSpec);
-
+
php.add(ld);
php.add(st);
php.connect(ld, st);
@@ -164,31 +165,31 @@ public class TestJobSubmission {
boolean gen = false;
setUp1(gen);
-
+
if(gen)
return;
submit();
-
+
assertEquals(true, FileLocalizer.fileExists(stFile, pc));
-
+
FileSpec fSpecExp = new FileSpec(expFile, new FuncSpec(PigStorage.class.getName()));
FileSpec fSpecAct = new FileSpec(stFile, new FuncSpec(PigStorage.class.getName()));
-
+
assertEquals(true, TestHelper.areFilesSame(fSpecExp, fSpecAct, pc));
}
-
+
private void setUp2(boolean gen) throws Exception {
ldFile = "file:" + inpDir + "jsTst2.txt";
expFile = ldFile;
stFile = "jsTst2";
grpName = "jobSubTst2";
-
+
if(gen){
generateInput(1000);
return;
}
-
+
hadoopLdFile = FileLocalizer.hadoopify(ldFile, pc);
FileSpec LFSpec = new FileSpec(hadoopLdFile, new FuncSpec(PigStorage.class.getName()));
@@ -200,7 +201,7 @@ public class TestJobSubmission {
ld.setLFile(LFSpec);
st.setPc(pc);
st.setSFile(SFSpec);
-
+
php.add(ld);
php.add(st);
php.connect(ld, st);
@@ -211,31 +212,31 @@ public class TestJobSubmission {
boolean gen = false;
setUp2(gen);
-
+
if(gen)
return;
submit();
-
+
assertEquals(true, FileLocalizer.fileExists(stFile, pc));
-
+
FileSpec fSpecExp = new FileSpec(expFile,new FuncSpec(PigStorage.class.getName()));
FileSpec fSpecAct = new FileSpec(stFile,new FuncSpec(PigStorage.class.getName()));
-
+
assertEquals(true, TestHelper.areFilesSame(fSpecExp, fSpecAct, pc));
}
-
+
private void setUp3(boolean gen) throws Exception {
ldFile = "file:" + inpDir + "jsTst1.txt";
expFile = "file:" + golDir + "jsTst3";
stFile = "jsTst3";
grpName = "jobSubTst3";
-
+
if(gen){
generateInput(1000);
return;
}
-
+
hadoopLdFile = FileLocalizer.hadoopify(ldFile, pc);
FileSpec LFSpec = new FileSpec(hadoopLdFile, new FuncSpec(PigStorage.class.getName()));
@@ -247,23 +248,23 @@ public class TestJobSubmission {
ld.setLFile(LFSpec);
st.setPc(pc);
st.setSFile(SFSpec);
-
+
int[] flds = {0,1};
Tuple sample = new DefaultTuple();
sample.append(new String("S"));
sample.append(new Integer("10"));
-
+
POForEach fe = GenPhyOp.topForEachOPWithPlan(flds , sample);
-
+
POFilter fl = GenPhyOp.topFilterOpWithProj(1, 500, GenPhyOp.LT);
-
+
php.add(ld);
php.add(fe);
php.connect(ld, fe);
-
+
php.add(fl);
php.connect(fe, fl);
-
+
php.add(st);
php.connect(fl, st);
}
@@ -273,31 +274,31 @@ public class TestJobSubmission {
boolean gen = false;
setUp3(gen);
-
+
if(gen)
return;
submit();
-
+
assertEquals(true, FileLocalizer.fileExists(stFile, pc));
-
+
FileSpec fSpecExp = new FileSpec(expFile, new FuncSpec(PigStorage.class.getName(), new String[]{","}));
FileSpec fSpecAct = new FileSpec(stFile,new FuncSpec(PigStorage.class.getName()));
-
+
assertEquals(true, TestHelper.areFilesSame(fSpecExp, fSpecAct, pc));
}
-
+
private void setUp4(boolean gen) throws Exception {
ldFile = "file:" + inpDir + "jsTst1.txt";
expFile = "file:" + golDir + "jsTst4";
stFile = "jsTst4";
grpName = "jobSubTst4";
-
+
if(gen){
generateInput(1000);
return;
}
-
+
hadoopLdFile = FileLocalizer.hadoopify(ldFile, pc);
FileSpec LFSpec = new FileSpec(hadoopLdFile,new FuncSpec(PigStorage.class.getName()));
@@ -309,27 +310,27 @@ public class TestJobSubmission {
ld.setLFile(LFSpec);
st.setPc(pc);
st.setSFile(SFSpec);
-
+
POSplit spl = GenPhyOp.topSplitOp();
POFilter fl1 = GenPhyOp.topFilterOpWithProjWithCast(1, 200, GenPhyOp.LT);
POFilter fl2 = GenPhyOp.topFilterOpWithProjWithCast(1, 800, GenPhyOp.GT);
-
+
POUnion un = GenPhyOp.topUnionOp();
-
+
php.add(ld);
php.add(spl);
php.connect(ld, spl);
-
+
php.add(fl1);
php.connect(spl, fl1);
-
+
php.add(fl2);
php.connect(spl, fl2);
-
+
php.add(un);
php.connect(fl1, un);
php.connect(fl2, un);
-
+
php.add(st);
php.connect(un, st);
}
@@ -339,32 +340,32 @@ public class TestJobSubmission {
boolean gen = false;
setUp4(gen);
-
+
if(gen)
return;
-
+
submit();
-
+
assertEquals(true, FileLocalizer.fileExists(stFile, pc));
-
+
FileSpec fSpecExp = new FileSpec(expFile, new FuncSpec(PigStorage.class.getName(), new String[]{","}));
FileSpec fSpecAct = new FileSpec(stFile,new FuncSpec(PigStorage.class.getName()));
-
+
assertEquals(true, TestHelper.areFilesSame(fSpecExp, fSpecAct, pc));
-
+
}
-
+
private void setUp5(boolean gen) throws Exception {
ldFile = "file:" + inpDir + "jsTst5.txt";
expFile = ldFile;
stFile = "jsTst5";
grpName = "jobSubTst5";
-
+
if(gen){
generateInput(1000);
return;
}
-
+
hadoopLdFile = FileLocalizer.hadoopify(ldFile, pc);
FileSpec LFSpec = new FileSpec(hadoopLdFile, new FuncSpec(PigStorage.class.getName(), new String[]{","}));
@@ -376,35 +377,35 @@ public class TestJobSubmission {
ld.setLFile(LFSpec);
st.setPc(pc);
st.setSFile(SFSpec);
-
+
Tuple sample = new DefaultTuple();
sample.append("S");
sample.append(1);
POLocalRearrange lr = GenPhyOp.topLocalRearrangeOPWithPlan(0, 1, sample);
-
+
POGlobalRearrange gr = GenPhyOp.topGlobalRearrangeOp();
-
+
POPackage pk = GenPhyOp.topPackageOp();
pk.setKeyType(DataType.INTEGER);
pk.setNumInps(1);
- boolean[] inner = {false};
+ boolean[] inner = {false};
pk.setInner(inner);
-
+
POForEach fe = GenPhyOp.topForEachOPWithPlan(1);
-
+
php.add(ld);
php.add(lr);
php.connect(ld, lr);
-
+
php.add(gr);
php.connect(lr, gr);
-
+
php.add(pk);
php.connect(gr, pk);
-
+
php.add(fe);
php.connect(pk, fe);
-
+
php.add(st);
php.connect(fe, st);
}
@@ -414,54 +415,54 @@ public class TestJobSubmission {
boolean gen = false;
setUp5(gen);
-
+
if(gen)
return;
-
+
submit();
-
+
assertEquals(true, FileLocalizer.fileExists(stFile, pc));
-
+
FileSpec fSpecExp = new FileSpec(expFile, new FuncSpec(PigStorage.class.getName(), new String[]{","}));
FileSpec fSpecAct = new FileSpec(stFile,new FuncSpec(PigStorage.class.getName()));
-
+
assertEquals(true, TestHelper.areFilesSame(fSpecExp, fSpecAct, pc));
-
+
}*/
-
+
@Test
public void testJobControlCompilerErr() throws Exception {
- String query = "a = load 'input';" + "b = order a by $0;" + "store b into 'output';";
- PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
- PhysicalPlan pp = Util.buildPp(pigServer, query);
- POStore store = GenPhyOp.dummyPigStorageOp();
- pp.addAsLeaf(store);
- MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
-
- for(MapReduceOper mro: mrPlan.getLeaves()) {
- if(mro.reducePlan != null) {
- PhysicalOperator po = mro.reducePlan.getRoots().get(0);
- if(po instanceof POPackage) {
- ((POPackage)po).setKeyType(DataType.BAG);
- mro.setGlobalSort(true);
- }
- }
- }
-
+ String query = "a = load 'input';" + "b = order a by $0;" + "store b into 'output';";
+ PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ PhysicalPlan pp = Util.buildPp(pigServer, query);
+ POStore store = GenPhyOp.dummyPigStorageOp();
+ pp.addAsLeaf(store);
+ MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+
+ for(MapReduceOper mro: mrPlan.getLeaves()) {
+ if(mro.reducePlan != null) {
+ PhysicalOperator po = mro.reducePlan.getRoots().get(0);
+ if(po instanceof POPackage) {
+ ((POPackage)po).setKeyType(DataType.BAG);
+ mro.setGlobalSort(true);
+ }
+ }
+ }
+
ConfigurationValidator.validatePigProperties(pc.getProperties());
Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
JobControlCompiler jcc = new JobControlCompiler(pc, conf);
try {
- jcc.compile(mrPlan, "Test");
+ jcc.compile(mrPlan, "Test");
} catch (JobCreationException jce) {
assertTrue(jce.getErrorCode() == 1068);
}
}
-
+
@Test
public void testDefaultParallel() throws Throwable {
pc.defaultParallel = 100;
-
+
String query = "a = load 'input';" + "b = group a by $0;" + "store b into 'output';";
PigServer ps = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
PhysicalPlan pp = Util.buildPp(ps, query);
@@ -470,7 +471,7 @@ public class TestJobSubmission {
ConfigurationValidator.validatePigProperties(pc.getProperties());
Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
JobControlCompiler jcc = new JobControlCompiler(pc, conf);
-
+
JobControl jobControl = jcc.compile(mrPlan, "Test");
Job job = jobControl.getWaitingJobs().get(0);
int parallel = job.getJobConf().getNumReduceTasks();
@@ -478,7 +479,7 @@ public class TestJobSubmission {
assertEquals(100, parallel);
Util.assertParallelValues(100, -1, -1, 100, job.getJobConf());
- pc.defaultParallel = -1;
+ pc.defaultParallel = -1;
}
@Test
@@ -503,21 +504,21 @@ public class TestJobSubmission {
}
assertEquals(3, counter);
- pc.defaultParallel = -1;
+ pc.defaultParallel = -1;
}
-
+
@Test
public void testDefaultParallelInSkewJoin() throws Throwable {
// default_parallel is considered only at runtime, so here we only test requested parallel
// more thorough tests can be found in TestNumberOfReducers.java
- String query = "a = load 'input';" +
- "b = load 'input';" +
+ String query = "a = load 'input';" +
+ "b = load 'input';" +
"c = join a by $0, b by $0 using 'skewed' parallel 100;" +
"store c into 'output';";
PigServer ps = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
PhysicalPlan pp = Util.buildPp(ps, query);
MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
-
+
// Get the skew join job
Iterator<MapReduceOper> iter = mrPlan.getKeys().values().iterator();
int counter = 0;
@@ -529,8 +530,8 @@ public class TestJobSubmission {
}
}
assertEquals(3, counter);
-
- pc.defaultParallel = -1;
+
+ pc.defaultParallel = -1;
}
@Test
@@ -541,18 +542,19 @@ public class TestJobSubmission {
// use the estimation
Configuration conf = cluster.getConfiguration();
HBaseTestingUtility util = new HBaseTestingUtility(conf);
- util.startMiniZKCluster();
+ int clientPort = util.startMiniZKCluster().getClientPort();
util.startMiniHBaseCluster(1, 1);
-
- String query = "a = load '/passwd';" +
+
+ String query = "a = load '/passwd';" +
"b = group a by $0;" +
"store b into 'output';";
PigServer ps = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
PhysicalPlan pp = Util.buildPp(ps, query);
MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
-
+
pc.getConf().setProperty("pig.exec.reducers.bytes.per.reducer", "100");
pc.getConf().setProperty("pig.exec.reducers.max", "10");
+ pc.getConf().setProperty(HConstants.ZOOKEEPER_CLIENT_PORT, Integer.toString(clientPort));
ConfigurationValidator.validatePigProperties(pc.getProperties());
conf = ConfigurationUtil.toConfiguration(pc.getProperties());
JobControlCompiler jcc = new JobControlCompiler(pc, conf);
@@ -568,7 +570,7 @@ public class TestJobSubmission {
"store b into 'output';";
pp = Util.buildPp(ps, query);
mrPlan = Util.buildMRPlan(pp, pc);
-
+
pc.getConf().setProperty("pig.exec.reducers.bytes.per.reducer", "100");
pc.getConf().setProperty("pig.exec.reducers.max", "10");
ConfigurationValidator.validatePigProperties(pc.getProperties());
@@ -581,14 +583,14 @@ public class TestJobSubmission {
final byte[] COLUMNFAMILY = Bytes.toBytes("pig");
util.createTable(Bytes.toBytesBinary("test_table"), COLUMNFAMILY);
-
+
// the estimation won't take effect when it apply to non-dfs or the files doesn't exist, such as hbase
query = "a = load 'hbase://test_table' using org.apache.pig.backend.hadoop.hbase.HBaseStorage('c:f1 c:f2');" +
"b = group a by $0 ;" +
"store b into 'output';";
pp = Util.buildPp(ps, query);
mrPlan = Util.buildMRPlan(pp, pc);
-
+
pc.getConf().setProperty("pig.exec.reducers.bytes.per.reducer", "100");
pc.getConf().setProperty("pig.exec.reducers.max", "10");
@@ -610,14 +612,14 @@ public class TestJobSubmission {
}
util.shutdownMiniZKCluster();
}
-
+
@Test
public void testReducerNumEstimationForOrderBy() throws Exception{
// use the estimation
pc.getProperties().setProperty("pig.exec.reducers.bytes.per.reducer", "100");
pc.getProperties().setProperty("pig.exec.reducers.max", "10");
-
- String query = "a = load '/passwd';" +
+
+ String query = "a = load '/passwd';" +
"b = order a by $0;" +
"store b into 'output';";
PigServer ps = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
@@ -634,7 +636,7 @@ public class TestJobSubmission {
Util.assertParallelValues(-1, 1, -1, 1, jobControl.getWaitingJobs().get(0).getJobConf());
// Simulate the first job having run so estimation kicks in.
- MapReduceOper sort = mrPlan.getLeaves().get(0);
+ MapReduceOper sort = mrPlan.getLeaves().get(0);
jcc.updateMROpPlan(jobControl.getReadyJobs());
FileLocalizer.create(sort.getQuantFile(), pc);
jobControl = jcc.compile(mrPlan, query);
@@ -650,39 +652,39 @@ public class TestJobSubmission {
query = "a = load '/passwd';" + "b = order a by $0 PARALLEL 2;" +
"store b into 'output';";
pp = Util.buildPp(ps, query);
-
- mrPlan = Util.buildMRPlanWithOptimizer(pp, pc);
+
+ mrPlan = Util.buildMRPlanWithOptimizer(pp, pc);
assertEquals(2, mrPlan.size());
-
- sort = mrPlan.getLeaves().get(0);
+
+ sort = mrPlan.getLeaves().get(0);
assertEquals(2, sort.getRequestedParallelism());
-
+
// the estimation won't take effect when it apply to non-dfs or the files doesn't exist, such as hbase
query = "a = load 'hbase://passwd' using org.apache.pig.backend.hadoop.hbase.HBaseStorage('c:f1 c:f2');" +
"b = order a by $0 ;" +
"store b into 'output';";
pp = Util.buildPp(ps, query);
-
- mrPlan = Util.buildMRPlanWithOptimizer(pp, pc);
- assertEquals(2, mrPlan.size());
-
+
+ mrPlan = Util.buildMRPlanWithOptimizer(pp, pc);
+ assertEquals(2, mrPlan.size());
+
sort = mrPlan.getLeaves().get(0);
-
+
// the requested parallel will be -1 if users don't set any of default_parallel, paralllel
// and the estimation doesn't take effect. MR framework will finally set it to 1.
assertEquals(-1, sort.getRequestedParallelism());
-
+
// test order by with three jobs (after optimization)
query = "a = load '/passwd';" +
"b = foreach a generate $0, $1, $2;" +
"c = order b by $0;" +
"store c into 'output';";
pp = Util.buildPp(ps, query);
-
+
mrPlan = Util.buildMRPlanWithOptimizer(pp, pc);
- assertEquals(3, mrPlan.size());
-
+ assertEquals(3, mrPlan.size());
+
// Simulate the first 2 jobs having run so estimation kicks in.
sort = mrPlan.getLeaves().get(0);
FileLocalizer.create(sort.getQuantFile(), pc);
@@ -701,7 +703,7 @@ public class TestJobSubmission {
Util.assertParallelValues(-1, 1, -1, 1, jobControl.getWaitingJobs().get(0).getJobConf());
jobControl = jcc.compile(mrPlan, query);
- sort = mrPlan.getLeaves().get(0);
+ sort = mrPlan.getLeaves().get(0);
assertEquals(reducer, sort.getRequestedParallelism());
//Third job is the order, which uses the estimated number of reducers