You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zl...@apache.org on 2017/02/22 09:43:46 UTC
svn commit: r1783988 [17/24] - in /pig/branches/spark: ./ bin/ conf/
contrib/piggybank/java/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/apachel...
Modified: pig/branches/spark/test/org/apache/pig/test/TestHBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestHBaseStorage.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestHBaseStorage.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestHBaseStorage.java Wed Feb 22 09:43:41 2017
@@ -71,12 +71,16 @@ public class TestHBaseStorage {
private static final String TESTTABLE_1 = "pigtable_1";
private static final String TESTTABLE_2 = "pigtable_2";
private static final byte[] COLUMNFAMILY = Bytes.toBytes("pig");
+ private static final byte[] COLUMNFAMILY2 = Bytes.toBytes("pig2");
private static final String TESTCOLUMN_A = "pig:col_a";
private static final String TESTCOLUMN_B = "pig:col_b";
private static final String TESTCOLUMN_C = "pig:col_c";
private static final int TEST_ROW_COUNT = 100;
+ private enum TableType {ONE_CF, TWO_CF};
+ private TableType lastTableType;
+
@BeforeClass
public static void setUp() throws Exception {
// This is needed by Pig
@@ -313,13 +317,13 @@ public class TestHBaseStorage {
*/
@Test
public void testLoadWithMap_3_col_prefix() throws IOException {
- prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);
+ prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText, TableType.TWO_CF);
pig.registerQuery("a = load 'hbase://"
+ TESTTABLE_1
+ "' using "
+ "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
- + "pig:col_* pig:prefixed_col_*"
+ + "pig2:* pig:prefixed_col_*"
+ "','-loadKey') as (rowKey:chararray, pig_cf_map:map[], pig_prefix_cf_map:map[]);");
Iterator<Tuple> it = pig.openIterator("a");
int count = 0;
@@ -328,24 +332,18 @@ public class TestHBaseStorage {
Tuple t = it.next();
LOG.info("LoadFromHBase " + t);
String rowKey = t.get(0).toString();
- Map pig_cf_map = (Map) t.get(1);
+ Map pig_secondery_cf_map = (Map) t.get(1);
Map pig_prefix_cf_map = (Map) t.get(2);
Assert.assertEquals(3, t.size());
Assert.assertEquals("00".substring((count + "").length()) + count,
rowKey);
+ Assert.assertEquals(count,
+ Integer.parseInt(pig_secondery_cf_map.get("col_x").toString()));
Assert.assertEquals("PrefixedText_" + count,
((DataByteArray) pig_prefix_cf_map.get("prefixed_col_d")).toString());
Assert.assertEquals(1, pig_prefix_cf_map.size());
- Assert.assertEquals(count,
- Integer.parseInt(pig_cf_map.get("col_a").toString()));
- Assert.assertEquals(count + 0.0,
- Double.parseDouble(pig_cf_map.get("col_b").toString()), 1e-6);
- Assert.assertEquals("Text_" + count,
- ((DataByteArray) pig_cf_map.get("col_c")).toString());
- Assert.assertEquals(3, pig_cf_map.size());
-
count++;
}
Assert.assertEquals(TEST_ROW_COUNT, count);
@@ -434,6 +432,39 @@ public class TestHBaseStorage {
LOG.info("LoadFromHBase done");
}
+ public void testLoadWithFixedAndPrefixedCols3() throws IOException {
+ prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);
+
+ pig.registerQuery("a = load 'hbase://"
+ + TESTTABLE_1
+ + "' using "
+ + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+ + "pig:* pig:prefixed_col_*"
+ + "','-loadKey') as (rowKey:chararray, pig_cf_map:map[], pig_prefix_cf_map:map[]);");
+ Iterator<Tuple> it = pig.openIterator("a");
+ int count = 0;
+ LOG.info("LoadFromHBase Starting");
+ while (it.hasNext()) {
+ Tuple t = it.next();
+ LOG.info("LoadFromHBase " + t);
+ String rowKey = (String) t.get(0);
+ Map pig_cf_map = (Map) t.get(1);
+ Map pig_prefix_cf_map = (Map) t.get(2);
+ Assert.assertEquals(3, t.size());
+
+ Assert.assertEquals("00".substring((count + "").length()) + count,
+ rowKey);
+ Assert.assertEquals("PrefixedText_" + count,
+ ((DataByteArray) pig_cf_map.get("prefixed_col_d")).toString());
+ Assert.assertEquals(1, pig_cf_map.size());
+ Assert.assertEquals(1, pig_prefix_cf_map.size());
+
+ count++;
+ }
+ Assert.assertEquals(TEST_ROW_COUNT, count);
+ LOG.info("LoadFromHBase done");
+ }
+
/**
* * Test Load from hbase with map parameters and with a
* static column in different order
@@ -1486,22 +1517,36 @@ public class TestHBaseStorage {
+ "') as (rowKey:chararray,col_a:int, col_b:double, col_c:chararray);");
}
+ private HTable prepareTable(String tableName, boolean initData,
+ DataFormat format) throws IOException {
+ return prepareTable(tableName, initData, format, TableType.ONE_CF);
+ }
/**
* Prepare a table in hbase for testing.
*
*/
private HTable prepareTable(String tableName, boolean initData,
- DataFormat format) throws IOException {
+ DataFormat format, TableType type) throws IOException {
// define the table schema
HTable table = null;
try {
- deleteAllRows(tableName);
+ if (lastTableType == type) {
+ deleteAllRows(tableName);
+ } else {
+ util.deleteTable(tableName);
+ }
} catch (Exception e) {
// It's ok, table might not exist.
}
try {
- table = util.createTable(Bytes.toBytesBinary(tableName),
- COLUMNFAMILY);
+ if (type == TableType.TWO_CF) {
+ table = util.createTable(Bytes.toBytesBinary(tableName),
+ new byte[][]{COLUMNFAMILY, COLUMNFAMILY2});
+ } else {
+ table = util.createTable(Bytes.toBytesBinary(tableName),
+ COLUMNFAMILY);
+ }
+ lastTableType = type;
} catch (Exception e) {
table = new HTable(conf, Bytes.toBytesBinary(tableName));
}
@@ -1528,6 +1573,11 @@ public class TestHBaseStorage {
// prefixed_col_d: string type
put.add(COLUMNFAMILY, Bytes.toBytes("prefixed_col_d"),
Bytes.toBytes("PrefixedText_" + i));
+ // another cf
+ if (type == TableType.TWO_CF) {
+ put.add(COLUMNFAMILY2, Bytes.toBytes("col_x"),
+ Bytes.toBytes(i));
+ }
table.put(put);
} else {
// row key: string type
@@ -1548,6 +1598,11 @@ public class TestHBaseStorage {
// prefixed_col_d: string type
put.add(COLUMNFAMILY, Bytes.toBytes("prefixed_col_d"),
("PrefixedText_" + i).getBytes());
+ // another cf
+ if (type == TableType.TWO_CF) {
+ put.add(COLUMNFAMILY2, Bytes.toBytes("col_x"),
+ (i + "").getBytes());
+ }
table.put(put);
}
}
Modified: pig/branches/spark/test/org/apache/pig/test/TestJobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestJobControlCompiler.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestJobControlCompiler.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestJobControlCompiler.java Wed Feb 22 09:43:41 2017
@@ -63,7 +63,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileSpec;
@@ -131,7 +130,7 @@ public class TestJobControlCompiler {
// verifying the jar gets on distributed cache
Path[] fileClassPaths = DistributedCache.getFileClassPaths(jobConf);
// guava jar is not shipped with Hadoop 2.x
- Assert.assertEquals("size for "+Arrays.toString(fileClassPaths), HadoopShims.isHadoopYARN() ? 5 : 6, fileClassPaths.length);
+ Assert.assertEquals("size for "+Arrays.toString(fileClassPaths), 5, fileClassPaths.length);
Path distributedCachePath = fileClassPaths[0];
Assert.assertEquals("ends with jar name: "+distributedCachePath, distributedCachePath.getName(), tmpFile.getName());
// hadoop bug requires path to not contain hdfs://hotname in front
@@ -235,22 +234,12 @@ public class TestJobControlCompiler {
// 4. another.jar and 5. udf1.jar, and not duplicate udf.jar
System.out.println("cache.files= " + Arrays.toString(cacheURIs));
System.out.println("classpath.files= " + Arrays.toString(fileClassPaths));
- if (HadoopShims.isHadoopYARN()) {
- // Default jars - 5 (pig, antlr, joda-time, automaton)
- // Other jars - 10 (udf.jar#udf.jar, udf1.jar#diffname.jar, udf2.jar, udf1.jar, another.jar
- Assert.assertEquals("size 9 for " + Arrays.toString(cacheURIs), 9,
- Arrays.asList(StringUtils.join(cacheURIs, ",").split(",")).size());
- Assert.assertEquals("size 9 for " + Arrays.toString(fileClassPaths), 9,
- Arrays.asList(StringUtils.join(fileClassPaths, ",").split(",")).size());
- } else {
- // Default jars - 5. Has guava in addition
- // There will be same entries duplicated for udf.jar and udf2.jar
- Assert.assertEquals("size 12 for " + Arrays.toString(cacheURIs), 12,
- Arrays.asList(StringUtils.join(cacheURIs, ",").split(",")).size());
- Assert.assertEquals("size 12 for " + Arrays.toString(fileClassPaths), 12,
- Arrays.asList(StringUtils.join(fileClassPaths, ",").split(",")).size());
- }
-
+ // Default jars - 5 (pig, antlr, joda-time, automaton)
+ // Other jars - 10 (udf.jar#udf.jar, udf1.jar#diffname.jar, udf2.jar, udf1.jar, another.jar
+ Assert.assertEquals("size 9 for " + Arrays.toString(cacheURIs), 9,
+ Arrays.asList(StringUtils.join(cacheURIs, ",").split(",")).size());
+ Assert.assertEquals("size 9 for " + Arrays.toString(fileClassPaths), 9,
+ Arrays.asList(StringUtils.join(fileClassPaths, ",").split(",")).size());
// Count occurrences of the resources
Map<String, Integer> occurrences = new HashMap<String, Integer>();
@@ -259,22 +248,12 @@ public class TestJobControlCompiler {
val = (val == null) ? 1 : ++val;
occurrences.put(cacheURI.toString(), val);
}
- if (HadoopShims.isHadoopYARN()) {
- Assert.assertEquals(9, occurrences.size());
- } else {
- Assert.assertEquals(10, occurrences.size()); //guava jar in addition
- }
+ Assert.assertEquals(9, occurrences.size());
for (String file : occurrences.keySet()) {
- if (!HadoopShims.isHadoopYARN() && (file.endsWith("udf.jar") || file.endsWith("udf2.jar"))) {
- // Same path added twice which is ok. It should not be a shipped to hdfs temp path.
- // We assert path is same by checking count
- Assert.assertEquals("Two occurrences for " + file, 2, (int) occurrences.get(file));
- } else {
- // check that only single occurrence even though we added once to dist cache (simulating via Oozie)
- // and second time through pig register jar when there is symlink
- Assert.assertEquals("One occurrence for " + file, 1, (int) occurrences.get(file));
- }
+ // check that only single occurrence even though we added once to dist cache (simulating via Oozie)
+ // and second time through pig register jar when there is symlink
+ Assert.assertEquals("One occurrence for " + file, 1, (int) occurrences.get(file));
}
}
Modified: pig/branches/spark/test/org/apache/pig/test/TestLineageFindRelVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestLineageFindRelVisitor.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestLineageFindRelVisitor.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestLineageFindRelVisitor.java Wed Feb 22 09:43:41 2017
@@ -20,16 +20,34 @@ package org.apache.pig.test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import junit.framework.Assert;
+import java.io.File;
import java.io.IOException;
import java.lang.reflect.Method;
+import java.util.Iterator;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.pig.FuncSpec;
import org.apache.pig.LoadCaster;
+import org.apache.pig.PigServer;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.builtin.Utf8StorageConverter;
+import org.apache.pig.builtin.mock.Storage;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.newplan.DependencyOrderWalker;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.PlanWalker;
+import org.apache.pig.newplan.ReverseDependencyOrderWalker;
+import org.apache.pig.newplan.logical.expression.CastExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionVisitor;
+import org.apache.pig.newplan.logical.optimizer.AllExpressionVisitor;
import org.apache.pig.newplan.logical.relational.LOLoad;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
import org.apache.pig.newplan.logical.visitor.LineageFindRelVisitor;
@@ -42,6 +60,13 @@ import org.junit.Test;
public class TestLineageFindRelVisitor {
+ private PigServer pig ;
+
+ @Before
+ public void setUp() throws Exception{
+ pig = new PigServer(Util.getLocalTestMode()) ;
+ }
+
public static class SillyLoadCasterWithExtraConstructor extends Utf8StorageConverter {
public SillyLoadCasterWithExtraConstructor(String ignored) {
super();
@@ -69,6 +94,13 @@ public class TestLineageFindRelVisitor {
}
}
+ public static class ToTupleWithCustomLoadCaster extends org.apache.pig.builtin.TOTUPLE {
+ @Override
+ public LoadCaster getLoadCaster() throws IOException {
+ return new SillyLoadCasterWithExtraConstructor("ignored");
+ }
+ }
+
@Test
public void testhaveIdenticalCasters() throws Exception {
LogicalPlan lp = new LogicalPlan();
@@ -123,6 +155,169 @@ public class TestLineageFindRelVisitor {
(Boolean) testMethod.invoke(lineageFindRelVisitor,
casterWithExtraConstuctorSpec, casterWithExtraConstuctorSpec) );
- Assert.assertEquals("Loader should be instantiated at most once.", SillyLoaderWithLoadCasterWithExtraConstructor.counter, 1);
+ Assert.assertEquals("Loader should be instantiated at most once.", 1, SillyLoaderWithLoadCasterWithExtraConstructor.counter);
+ }
+
+ @Test
+ public void testIdenticalColumnUDFForwardingLoadCaster() throws Exception {
+ Storage.Data data = Storage.resetData(pig);
+ data.set("input",
+ Storage.tuple(Storage.map(
+ "key1",new DataByteArray("aaa"),
+ "key2",new DataByteArray("bbb"),
+ "key3",new DataByteArray("ccc"))),
+ Storage.tuple(Storage.map(
+ "key1",new DataByteArray("zzz"),
+ "key2",new DataByteArray("yyy"),
+ "key3",new DataByteArray("xxx"))));
+ pig.setBatchOn();
+ pig.registerQuery("A = load 'input' using mock.Storage() as (m:[bytearray]);");
+ pig.registerQuery("B = foreach A GENERATE m#'key1' as key1, m#'key2' as key2; "
+ // this equal comparison creates implicit typecast to chararray
+ // which requires loadcaster
+ + "C = FILTER B by key1 == 'aaa' and key2 == 'bbb';");
+ pig.registerQuery("store C into 'output' using mock.Storage();");
+
+ pig.executeBatch();
+
+ List<Tuple> actualResults = data.get("output");
+ List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+ new String[] {"('aaa', 'bbb')"});
+ Util.checkQueryOutputs(actualResults.iterator(), expectedResults);
+ }
+
+ @Test
+ public void testUDFForwardingLoadCaster() throws Exception {
+ Storage.Data data = Storage.resetData(pig);
+ data.set("input",
+ Storage.tuple(new DataByteArray("aaa")),
+ Storage.tuple(new DataByteArray("bbb")));
+ pig.setBatchOn();
+ String query = "A = load 'input' using mock.Storage() as (a1:bytearray);"
+ + "B = foreach A GENERATE TOTUPLE(a1) as tupleA;"
+ + "C = foreach B GENERATE (chararray) tupleA.a1;" //using loadcaster
+ + "store C into 'output' using mock.Storage();";
+
+ LogicalPlan lp = Util.parse(query, pig.getPigContext());
+ Util.optimizeNewLP(lp);
+
+ CastFinder cf = new CastFinder(lp);
+ cf.visit();
+ Assert.assertEquals("There should be only one typecast expression.", 1, cf.casts.size());
+ Assert.assertEquals("Loadcaster should be coming from the Load", "mock.Storage", cf.casts.get(0).getFuncSpec().getClassName());
+
+ pig.registerQuery(query);
+ pig.executeBatch();
+
+ List<Tuple> actualResults = data.get("output");
+ List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+ new String[] {"('aaa')", "('bbb')"});
+ Util.checkQueryOutputs(actualResults.iterator(), expectedResults);
+ }
+
+ @Test
+ public void testUDFgetLoadCaster() throws Exception {
+ Storage.Data data = Storage.resetData(pig);
+ data.set("input",
+ Storage.tuple(new DataByteArray("aaa")),
+ Storage.tuple(new DataByteArray("bbb")));
+ pig.setBatchOn();
+ String query = "A = load 'input' using mock.Storage() as (a1:bytearray);"
+ + "B = foreach A GENERATE org.apache.pig.test.TestLineageFindRelVisitor$ToTupleWithCustomLoadCaster(a1) as tupleA;"
+ + "C = foreach B GENERATE (chararray) tupleA.a1;" //using loadcaster
+ + "store C into 'output' using mock.Storage();";
+
+ pig.registerQuery(query);
+ pig.executeBatch();
+
+ LogicalPlan lp = Util.parse(query, pig.getPigContext());
+ Util.optimizeNewLP(lp);
+
+ CastFinder cf = new CastFinder(lp);
+ cf.visit();
+ Assert.assertEquals("There should be only one typecast expression.", 1, cf.casts.size());
+ Assert.assertEquals("Loadcaster should be coming from the UDF", "org.apache.pig.test.TestLineageFindRelVisitor$ToTupleWithCustomLoadCaster", cf.casts.get(0).getFuncSpec().getClassName());
+
+ List<Tuple> actualResults = data.get("output");
+ List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+ new String[] {"('aaa')", "('bbb')"});
+ Util.checkQueryOutputs(actualResults.iterator(), expectedResults);
+ }
+
+ @Test
+ public void testUDFForwardingLoadCasterWithMultipleParams() throws Exception{
+ File inputfile = Util.createFile(new String[]{"123","456","789"});
+
+ pig.registerQuery("A = load '"
+ + inputfile.toString()
+ + "' using PigStorage() as (a1:bytearray);\n");
+ pig.registerQuery("B = load '"
+ + inputfile.toString()
+ + "' using PigStorage() as (b1:bytearray);\n");
+ pig.registerQuery("C = join A by a1, B by b1;\n");
+ pig.registerQuery("D = FOREACH C GENERATE TOTUPLE(a1,b1) as tupleD;\n");
+ pig.registerQuery("E = FOREACH D GENERATE (chararray) tupleD.a1;\n");
+ Iterator<Tuple> iter = pig.openIterator("E");
+
+ Assert.assertEquals("123", iter.next().get(0));
+ Assert.assertEquals("456", iter.next().get(0));
+ Assert.assertEquals("789", iter.next().get(0));
+ }
+
+ @Test
+ public void testNegativeUDFForwardingLoadCasterWithMultipleParams() throws Exception {
+ File inputfile = Util.createFile(new String[]{"123","456","789"});
+
+ pig.registerQuery("A = load '"
+ + inputfile.toString()
+ + "' using PigStorage() as (a1:bytearray);\n");
+ pig.registerQuery("B = load '"
+ + inputfile.toString()
+ + "' using org.apache.pig.test.TestLineageFindRelVisitor$SillyLoaderWithLoadCasterWithExtraConstructor2() as (b1:bytearray);\n");
+ pig.registerQuery("C = join A by a1, B by b1;\n");
+ pig.registerQuery("D = FOREACH C GENERATE TOTUPLE(a1,b1) as tupleD;\n");
+ pig.registerQuery("E = FOREACH D GENERATE (chararray) tupleD.a1;\n");
+ try {
+ Iterator<Tuple> iter = pig.openIterator("E");
+
+ // this should fail since above typecast cannot determine which
+ // loadcaster to use (one from PigStroage and another from
+ // SillyLoaderWithLoadCasterWithExtraConstructor2)
+ fail("Above typecast should fail since it cannot determine which loadcaster to use.");
+ } catch (IOException e) {
+ Assert.assertTrue(e.getMessage().contains("Unable to open iterator for alias E"));
+ }
+
+
+ }
+
+ /**
+ * Find all casts in the plan (Copied from TestTypeCheckingValidatorNewLP.java)
+ */
+ class CastFinder extends AllExpressionVisitor {
+ List<CastExpression> casts = new ArrayList<CastExpression>();
+
+ public CastFinder(OperatorPlan plan)
+ throws FrontendException {
+ super(plan, new DependencyOrderWalker(plan));
+ }
+
+ @Override
+ protected LogicalExpressionVisitor getVisitor(
+ LogicalExpressionPlan exprPlan) throws FrontendException {
+ return new CastExpFinder(exprPlan, new ReverseDependencyOrderWalker(exprPlan));
+ }
+
+ class CastExpFinder extends LogicalExpressionVisitor{
+ protected CastExpFinder(OperatorPlan p, PlanWalker walker)
+ throws FrontendException {
+ super(p, walker);
+ }
+
+ @Override
+ public void visit(CastExpression cExp){
+ casts.add(cExp);
+ }
+ }
}
}
Modified: pig/branches/spark/test/org/apache/pig/test/TestLoad.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestLoad.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestLoad.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestLoad.java Wed Feb 22 09:43:41 2017
@@ -67,6 +67,8 @@ public class TestLoad {
static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
+ private static final String WORKING_DIR = "/tmp/test" + java.util.UUID.randomUUID();
+
@Before
public void setUp() throws Exception {
FileLocalizer.deleteTempFiles();
@@ -118,7 +120,7 @@ public class TestLoad {
public void testLoadRemoteRel() throws Exception {
for (PigServer pig : servers) {
pc = pig.getPigContext();
- checkLoadPath("test","/tmp/test");
+ checkLoadPath("test", WORKING_DIR + "/test");
}
}
@@ -127,7 +129,7 @@ public class TestLoad {
for (PigServer pig : servers) {
pc = pig.getPigContext();
boolean noConversionExpected = true;
- checkLoadPath("/tmp/test","/tmp/test", noConversionExpected);
+ checkLoadPath(WORKING_DIR + "/test", WORKING_DIR + "/test", noConversionExpected);
}
}
@@ -135,7 +137,7 @@ public class TestLoad {
public void testLoadRemoteRelScheme() throws Exception {
for (PigServer pig : servers) {
pc = pig.getPigContext();
- checkLoadPath("test","/tmp/test");
+ checkLoadPath("test", WORKING_DIR + "/test");
}
}
@@ -143,11 +145,11 @@ public class TestLoad {
public void testLoadRemoteAbsScheme() throws Exception {
pc = servers[0].getPigContext();
boolean noConversionExpected = true;
- checkLoadPath("hdfs:/tmp/test","hdfs:/tmp/test", noConversionExpected);
+ checkLoadPath("hdfs:" + WORKING_DIR + "/test","hdfs:" + WORKING_DIR + "/test", noConversionExpected);
// check if a location 'hdfs:<abs path>' can actually be read using PigStorage
String[] inputFileNames = new String[] {
- "/tmp/TestLoad-testLoadRemoteAbsSchema-input.txt"};
+ WORKING_DIR + "/TestLoad-testLoadRemoteAbsSchema-input.txt"};
testLoadingMultipleFiles(inputFileNames, "hdfs:" + inputFileNames[0]);
}
@@ -162,7 +164,7 @@ public class TestLoad {
for (PigServer pig : servers) {
pc = pig.getPigContext();
boolean noConversionExpected = true;
- checkLoadPath("/tmp/foo/../././","/tmp/foo/.././.", noConversionExpected);
+ checkLoadPath(WORKING_DIR + "/foo/../././", WORKING_DIR + "/foo/.././.", noConversionExpected);
}
}
@@ -170,7 +172,7 @@ public class TestLoad {
public void testGlobChars() throws Exception {
for (PigServer pig : servers) {
pc = pig.getPigContext();
- checkLoadPath("t?s*","/tmp/t?s*");
+ checkLoadPath("t?s*", WORKING_DIR + "/t?s*");
}
}
@@ -178,7 +180,7 @@ public class TestLoad {
public void testCommaSeparatedString() throws Exception {
for (PigServer pig : servers) {
pc = pig.getPigContext();
- checkLoadPath("usr/pig/a,usr/pig/b","/tmp/usr/pig/a,/tmp/usr/pig/b");
+ checkLoadPath("usr/pig/a,b", WORKING_DIR + "/usr/pig/a,"+ WORKING_DIR + "/b");
}
}
@@ -186,7 +188,7 @@ public class TestLoad {
public void testCommaSeparatedString2() throws Exception {
for (PigServer pig : servers) {
pc = pig.getPigContext();
- checkLoadPath("t?s*,test","/tmp/t?s*,/tmp/test");
+ checkLoadPath("t?s*,test", WORKING_DIR + "/t?s*,"+ WORKING_DIR + "/test");
}
}
@@ -196,14 +198,14 @@ public class TestLoad {
PigServer pig = servers[0];
pc = pig.getPigContext();
boolean noConversionExpected = true;
- checkLoadPath("hdfs:/tmp/test,hdfs:/tmp/test2,hdfs:/tmp/test3",
- "hdfs:/tmp/test,hdfs:/tmp/test2,hdfs:/tmp/test3", noConversionExpected );
+ checkLoadPath("hdfs:"+ WORKING_DIR + "/test,hdfs:" + WORKING_DIR + "/test2,hdfs:" + WORKING_DIR + "/test3",
+ "hdfs:" + WORKING_DIR + "/test,hdfs:" + WORKING_DIR + "/test2,hdfs:" + WORKING_DIR + "/test3", noConversionExpected );
// check if a location 'hdfs:<abs path>,hdfs:<abs path>' can actually be
// read using PigStorage
String[] inputFileNames = new String[] {
- "/tmp/TestLoad-testCommaSeparatedString3-input1.txt",
- "/tmp/TestLoad-testCommaSeparatedString3-input2.txt"};
+ WORKING_DIR + "/TestLoad-testCommaSeparatedString3-input1.txt",
+ WORKING_DIR + "/TestLoad-testCommaSeparatedString3-input2.txt"};
String inputString = "hdfs:" + inputFileNames[0] + ",hdfs:" +
inputFileNames[1];
testLoadingMultipleFiles(inputFileNames, inputString);
@@ -214,7 +216,7 @@ public class TestLoad {
public void testCommaSeparatedString4() throws Exception {
for (PigServer pig : servers) {
pc = pig.getPigContext();
- checkLoadPath("usr/pig/{a,c},usr/pig/b","/tmp/usr/pig/{a,c},/tmp/usr/pig/b");
+ checkLoadPath("usr/pig/{a,c},usr/pig/b", WORKING_DIR + "/usr/pig/{a,c}," + WORKING_DIR + "/usr/pig/b");
}
}
@@ -222,18 +224,18 @@ public class TestLoad {
public void testCommaSeparatedString5() throws Exception {
for (PigServer pig : servers) {
pc = pig.getPigContext();
- checkLoadPath("/usr/pig/{a,c},usr/pig/b","/usr/pig/{a,c},/tmp/usr/pig/b");
+ checkLoadPath("/usr/pig/{a,c},b", "/usr/pig/{a,c}," + WORKING_DIR + "/b");
}
// check if a location '<abs path>,<relative path>' can actually be
// read using PigStorage
- String loadLocationString = "/tmp/TestLoad-testCommaSeparatedStringMixed-input{1,2}.txt," +
- "TestLoad-testCommaSeparatedStringMixed-input3.txt"; // current working dir is set to /tmp in checkLoadPath()
+ String loadLocationString = WORKING_DIR + "/TestLoad-testCommaSeparatedStringMixed-input{1,2}.txt," +
+ "TestLoad-testCommaSeparatedStringMixed-input3.txt"; // current working dir is set to WORKING_DIR in checkLoadPath()
String[] inputFileNames = new String[] {
- "/tmp/TestLoad-testCommaSeparatedStringMixed-input1.txt",
- "/tmp/TestLoad-testCommaSeparatedStringMixed-input2.txt",
- "/tmp/TestLoad-testCommaSeparatedStringMixed-input3.txt",};
+ WORKING_DIR + "/TestLoad-testCommaSeparatedStringMixed-input1.txt",
+ WORKING_DIR + "/TestLoad-testCommaSeparatedStringMixed-input2.txt",
+ WORKING_DIR + "/TestLoad-testCommaSeparatedStringMixed-input3.txt",};
pc = servers[0].getPigContext(); // test in map reduce mode
testLoadingMultipleFiles(inputFileNames, loadLocationString);
}
@@ -242,7 +244,7 @@ public class TestLoad {
public void testCommaSeparatedString6() throws Exception {
for (PigServer pig : servers) {
pc = pig.getPigContext();
- checkLoadPath("usr/pig/{a,c},/usr/pig/b","/tmp/usr/pig/{a,c},/usr/pig/b");
+ checkLoadPath("usr/pig/{a,c},/usr/pig/b", WORKING_DIR + "/usr/pig/{a,c},/usr/pig/b");
}
}
@@ -250,7 +252,7 @@ public class TestLoad {
public void testNonDfsLocation() throws Exception {
String nonDfsUrl = "har:///user/foo/f.har";
String query = "a = load '" + nonDfsUrl + "' using PigStorage('\t','-noschema');" +
- "store a into 'output';";
+ "store a into 'pigoutput';";
LogicalPlan lp = Util.buildLp(servers[1], query);
LOLoad load = (LOLoad) lp.getSources().get(0);
nonDfsUrl = nonDfsUrl.replaceFirst("/$", "");
@@ -308,7 +310,7 @@ public class TestLoad {
pc.getProperties().setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + b);
DataStorage dfs = pc.getDfs();
- dfs.setActiveContainer(dfs.asContainer("/tmp"));
+ dfs.setActiveContainer(dfs.asContainer(WORKING_DIR));
Map<String, String> fileNameMap = new HashMap<String, String>();
QueryParserDriver builder = new QueryParserDriver(pc, "Test-Load", fileNameMap);
Modified: pig/branches/spark/test/org/apache/pig/test/TestLoaderStorerShipCacheFiles.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestLoaderStorerShipCacheFiles.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestLoaderStorerShipCacheFiles.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestLoaderStorerShipCacheFiles.java Wed Feb 22 09:43:41 2017
@@ -45,12 +45,8 @@ public abstract class TestLoaderStorerSh
"store a into 'ooo';";
PhysicalPlan pp = Util.buildPp(pigServer, query);
- String hadoopVersion = "20S";
- if (Utils.isHadoop23() || Utils.isHadoop2()) {
- hadoopVersion = "23";
- }
- String[] expectedJars = new String[] {"hive-common", "hive-exec", "hive-serde",
- "hive-shims-0." + hadoopVersion, "hive-shims-common", "kryo"};
+ String[] expectedJars = new String[] {"hive-common", "hive-exec", "hive-serde",
+ "hive-shims-0.23", "hive-shims-common", "kryo"};
checkPlan(pp, expectedJars, 6, pigServer.getPigContext());
}
@@ -61,12 +57,8 @@ public abstract class TestLoaderStorerSh
"store a into 'ooo' using OrcStorage;";
PhysicalPlan pp = Util.buildPp(pigServer, query);
- String hadoopVersion = "20S";
- if (Utils.isHadoop23() || Utils.isHadoop2()) {
- hadoopVersion = "23";
- }
- String[] expectedJars = new String[] {"hive-common", "hive-exec", "hive-serde",
- "hive-shims-0." + hadoopVersion, "hive-shims-common", "kryo"};
+ String[] expectedJars = new String[] {"hive-common", "hive-exec", "hive-serde",
+ "hive-shims-0.23", "hive-shims-common", "kryo"};
checkPlan(pp, expectedJars, 6, pigServer.getPigContext());
}
Modified: pig/branches/spark/test/org/apache/pig/test/TestLocal.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestLocal.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestLocal.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestLocal.java Wed Feb 22 09:43:41 2017
@@ -39,6 +39,7 @@ import org.apache.pig.builtin.PigStorage
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
+import org.apache.pig.data.DefaultTuple;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.io.FileLocalizer;
@@ -166,7 +167,8 @@ public class TestLocal {
public Tuple getNext() throws IOException {
if (count < COUNT) {
- Tuple t = TupleFactory.getInstance().newTuple(Integer.toString(count++));
+ Tuple t = new DefaultTuple();
+ t.append(Integer.toString(count++));
return t;
}