You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zl...@apache.org on 2017/02/24 03:34:40 UTC
svn commit: r1784224 [11/17] - in /pig/branches/spark: ./ bin/ conf/
contrib/piggybank/java/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/apachel...
Modified: pig/branches/spark/test/org/apache/pig/test/TestCounters.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestCounters.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestCounters.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestCounters.java Fri Feb 24 03:34:37 2017
@@ -30,17 +30,17 @@ import java.util.Map;
import java.util.Random;
import org.apache.hadoop.fs.Path;
+import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.executionengine.ExecJob;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.tools.pigstats.InputStats;
import org.apache.pig.tools.pigstats.JobStats;
-import org.apache.pig.tools.pigstats.OutputStats;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.PigStats.JobGraph;
+import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
import org.junit.AfterClass;
-import org.junit.Assume;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -49,8 +49,8 @@ import org.junit.runners.JUnit4;
public class TestCounters {
String file = "input.txt";
- static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
-
+ static MiniCluster cluster = MiniCluster.buildCluster();
+
final int MAX = 100*1000;
Random r = new Random();
@@ -59,7 +59,7 @@ public class TestCounters {
public static void oneTimeTearDown() throws Exception {
cluster.shutDown();
}
-
+
@Test
public void testMapOnly() throws IOException, ExecException {
int count = 0;
@@ -70,13 +70,13 @@ public class TestCounters {
if(t > 50) count ++;
}
pw.close();
- PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
+ PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
pigServer.registerQuery("a = load '" + file + "';");
pigServer.registerQuery("b = filter a by $0 > 50;");
pigServer.registerQuery("c = foreach b generate $0 - 50;");
ExecJob job = pigServer.store("c", "output_map_only");
PigStats pigStats = job.getStatistics();
-
+
//counting the no. of bytes in the output file
//long filesize = cluster.getFileSystem().getFileStatus(new Path("output_map_only")).getLen();
InputStream is = FileLocalizer.open(FileLocalizer.fullPath(
@@ -85,9 +85,9 @@ public class TestCounters {
long filesize = 0;
while(is.read() != -1) filesize++;
-
+
is.close();
-
+
cluster.getFileSystem().delete(new Path(file), true);
cluster.getFileSystem().delete(new Path("output_map_only"), true);
@@ -98,7 +98,7 @@ public class TestCounters {
JobGraph jg = pigStats.getJobGraph();
Iterator<JobStats> iter = jg.iterator();
while (iter.hasNext()) {
- JobStats js = iter.next();
+ MRJobStats js = (MRJobStats) iter.next();
System.out.println("Map input records : " + js.getMapInputRecords());
assertEquals(MAX, js.getMapInputRecords());
@@ -123,20 +123,20 @@ public class TestCounters {
count ++;
}
pw.close();
- PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
+ PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
pigServer.registerQuery("a = load '" + file + "';");
pigServer.registerQuery("b = filter a by $0 > 50;");
pigServer.registerQuery("c = foreach b generate $0 - 50;");
ExecJob job = pigServer.store("c", "output_map_only", "BinStorage");
PigStats pigStats = job.getStatistics();
-
+
InputStream is = FileLocalizer.open(FileLocalizer.fullPath(
"output_map_only", pigServer.getPigContext()),
pigServer.getPigContext());
long filesize = 0;
while(is.read() != -1) filesize++;
-
+
is.close();
cluster.getFileSystem().delete(new Path(file), true);
@@ -149,8 +149,8 @@ public class TestCounters {
JobGraph jp = pigStats.getJobGraph();
Iterator<JobStats> iter = jp.iterator();
while (iter.hasNext()) {
- JobStats js = iter.next();
-
+ MRJobStats js = (MRJobStats) iter.next();
+
System.out.println("Map input records : " + js.getMapInputRecords());
assertEquals(MAX, js.getMapInputRecords());
System.out.println("Map output records : " + js.getMapOutputRecords());
@@ -158,7 +158,7 @@ public class TestCounters {
assertEquals(0, js.getReduceInputRecords());
assertEquals(0, js.getReduceOutputRecords());
}
-
+
System.out.println("Hdfs bytes written : " + pigStats.getBytesWritten());
assertEquals(filesize, pigStats.getBytesWritten());
}
@@ -183,7 +183,7 @@ public class TestCounters {
if(nos[i] > 0) count ++;
}
- PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
+ PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
pigServer.registerQuery("a = load '" + file + "';");
pigServer.registerQuery("b = group a by $0;");
pigServer.registerQuery("c = foreach b generate group;");
@@ -195,7 +195,7 @@ public class TestCounters {
long filesize = 0;
while(is.read() != -1) filesize++;
-
+
is.close();
cluster.getFileSystem().delete(new Path(file), true);
@@ -208,7 +208,7 @@ public class TestCounters {
JobGraph jp = pigStats.getJobGraph();
Iterator<JobStats> iter = jp.iterator();
while (iter.hasNext()) {
- JobStats js = iter.next();
+ MRJobStats js = (MRJobStats) iter.next();
System.out.println("Map input records : " + js.getMapInputRecords());
assertEquals(MAX, js.getMapInputRecords());
System.out.println("Map output records : " + js.getMapOutputRecords());
@@ -242,7 +242,7 @@ public class TestCounters {
if(nos[i] > 0) count ++;
}
- PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
+ PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
pigServer.registerQuery("a = load '" + file + "';");
pigServer.registerQuery("b = group a by $0;");
pigServer.registerQuery("c = foreach b generate group;");
@@ -253,9 +253,9 @@ public class TestCounters {
pigServer.getPigContext()), pigServer.getPigContext());
long filesize = 0;
while(is.read() != -1) filesize++;
-
+
is.close();
-
+
cluster.getFileSystem().delete(new Path(file), true);
cluster.getFileSystem().delete(new Path("output"), true);
@@ -266,7 +266,7 @@ public class TestCounters {
JobGraph jp = pigStats.getJobGraph();
Iterator<JobStats> iter = jp.iterator();
while (iter.hasNext()) {
- JobStats js = iter.next();
+ MRJobStats js = (MRJobStats) iter.next();
System.out.println("Map input records : " + js.getMapInputRecords());
assertEquals(MAX, js.getMapInputRecords());
System.out.println("Map output records : " + js.getMapOutputRecords());
@@ -300,7 +300,7 @@ public class TestCounters {
if(nos[i] > 0) count ++;
}
- PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
+ PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
pigServer.registerQuery("a = load '" + file + "';");
pigServer.registerQuery("b = group a by $0;");
pigServer.registerQuery("c = foreach b generate group, SUM(a.$1);");
@@ -311,20 +311,20 @@ public class TestCounters {
pigServer.getPigContext()), pigServer.getPigContext());
long filesize = 0;
while(is.read() != -1) filesize++;
-
+
is.close();
-
+
cluster.getFileSystem().delete(new Path(file), true);
cluster.getFileSystem().delete(new Path("output"), true);
System.out.println("============================================");
System.out.println("Test case MapCombineReduce");
System.out.println("============================================");
-
+
JobGraph jp = pigStats.getJobGraph();
Iterator<JobStats> iter = jp.iterator();
while (iter.hasNext()) {
- JobStats js = iter.next();
+ MRJobStats js = (MRJobStats) iter.next();
System.out.println("Map input records : " + js.getMapInputRecords());
assertEquals(MAX, js.getMapInputRecords());
System.out.println("Map output records : " + js.getMapOutputRecords());
@@ -337,7 +337,7 @@ public class TestCounters {
System.out.println("Hdfs bytes written : " + pigStats.getBytesWritten());
assertEquals(filesize, pigStats.getBytesWritten());
}
-
+
@Test
public void testMapCombineReduceBinStorage() throws IOException, ExecException {
int count = 0;
@@ -358,20 +358,20 @@ public class TestCounters {
if(nos[i] > 0) count ++;
}
- PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
+ PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
pigServer.registerQuery("a = load '" + file + "';");
pigServer.registerQuery("b = group a by $0;");
pigServer.registerQuery("c = foreach b generate group, SUM(a.$1);");
ExecJob job = pigServer.store("c", "output", "BinStorage");
PigStats pigStats = job.getStatistics();
-
+
InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output",
pigServer.getPigContext()), pigServer.getPigContext());
long filesize = 0;
while(is.read() != -1) filesize++;
-
+
is.close();
cluster.getFileSystem().delete(new Path(file), true);
cluster.getFileSystem().delete(new Path("output"), true);
@@ -379,11 +379,11 @@ public class TestCounters {
System.out.println("============================================");
System.out.println("Test case MapCombineReduce");
System.out.println("============================================");
-
+
JobGraph jp = pigStats.getJobGraph();
Iterator<JobStats> iter = jp.iterator();
while (iter.hasNext()) {
- JobStats js = iter.next();
+ MRJobStats js = (MRJobStats) iter.next();
System.out.println("Map input records : " + js.getMapInputRecords());
assertEquals(MAX, js.getMapInputRecords());
System.out.println("Map output records : " + js.getMapOutputRecords());
@@ -399,8 +399,6 @@ public class TestCounters {
@Test
public void testMultipleMRJobs() throws IOException, ExecException {
- Assume.assumeTrue("Skip this test for TEZ. Assert is done only for first MR job",
- Util.isMapredExecType(cluster.getExecType()));
int count = 0;
PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file));
int [] nos = new int[10];
@@ -415,38 +413,38 @@ public class TestCounters {
}
pw.close();
- for(int i = 0; i < 10; i++) {
+ for(int i = 0; i < 10; i++) {
if(nos[i] > 0) count ++;
}
- PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
+ PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
pigServer.registerQuery("a = load '" + file + "';");
pigServer.registerQuery("b = order a by $0;");
pigServer.registerQuery("c = group b by $0;");
pigServer.registerQuery("d = foreach c generate group, SUM(b.$1);");
ExecJob job = pigServer.store("d", "output");
PigStats pigStats = job.getStatistics();
-
+
InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output",
pigServer.getPigContext()), pigServer.getPigContext());
long filesize = 0;
while(is.read() != -1) filesize++;
-
+
is.close();
-
+
cluster.getFileSystem().delete(new Path(file), true);
cluster.getFileSystem().delete(new Path("output"), true);
-
+
System.out.println("============================================");
System.out.println("Test case MultipleMRJobs");
System.out.println("============================================");
-
+
JobGraph jp = pigStats.getJobGraph();
- JobStats js = (JobStats)jp.getSinks().get(0);
-
+ MRJobStats js = (MRJobStats)jp.getSinks().get(0);
+
System.out.println("Job id: " + js.getName());
System.out.println(jp.toString());
-
+
System.out.println("Map input records : " + js.getMapInputRecords());
assertEquals(MAX, js.getMapInputRecords());
System.out.println("Map output records : " + js.getMapOutputRecords());
@@ -455,12 +453,12 @@ public class TestCounters {
assertEquals(count, js.getReduceInputRecords());
System.out.println("Reduce output records : " + js.getReduceOutputRecords());
assertEquals(count, js.getReduceOutputRecords());
-
+
System.out.println("Hdfs bytes written : " + js.getHdfsBytesWritten());
assertEquals(filesize, js.getHdfsBytesWritten());
}
-
+
@Test
public void testMapOnlyMultiQueryStores() throws Exception {
PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file));
@@ -469,8 +467,8 @@ public class TestCounters {
pw.println(t);
}
pw.close();
-
- PigServer pigServer = new PigServer(cluster.getExecType(),
+
+ PigServer pigServer = new PigServer(ExecType.MAPREDUCE,
cluster.getProperties());
pigServer.setBatchOn();
pigServer.registerQuery("a = load '" + file + "';");
@@ -481,22 +479,22 @@ public class TestCounters {
List<ExecJob> jobs = pigServer.executeBatch();
PigStats stats = jobs.get(0).getStatistics();
assertTrue(stats.getOutputLocations().size() == 2);
-
+
cluster.getFileSystem().delete(new Path(file), true);
cluster.getFileSystem().delete(new Path("/tmp/outout1"), true);
cluster.getFileSystem().delete(new Path("/tmp/outout2"), true);
- JobStats js = (JobStats)stats.getJobGraph().getSinks().get(0);
-
+ MRJobStats js = (MRJobStats)stats.getJobGraph().getSinks().get(0);
+
Map<String, Long> entry = js.getMultiStoreCounters();
long counter = 0;
for (Long val : entry.values()) {
counter += val;
}
-
- assertEquals(MAX, counter);
- }
-
+
+ assertEquals(MAX, counter);
+ }
+
@Test
public void testMultiQueryStores() throws Exception {
int[] nums = new int[100];
@@ -507,13 +505,13 @@ public class TestCounters {
nums[t]++;
}
pw.close();
-
+
int groups = 0;
for (int i : nums) {
if (i > 0) groups++;
}
-
- PigServer pigServer = new PigServer(cluster.getExecType(),
+
+ PigServer pigServer = new PigServer(ExecType.MAPREDUCE,
cluster.getProperties());
pigServer.setBatchOn();
pigServer.registerQuery("a = load '" + file + "';");
@@ -527,29 +525,29 @@ public class TestCounters {
pigServer.registerQuery("store g into '/tmp/outout2';");
List<ExecJob> jobs = pigServer.executeBatch();
PigStats stats = jobs.get(0).getStatistics();
-
+
assertTrue(stats.getOutputLocations().size() == 2);
-
+
cluster.getFileSystem().delete(new Path(file), true);
cluster.getFileSystem().delete(new Path("/tmp/outout1"), true);
cluster.getFileSystem().delete(new Path("/tmp/outout2"), true);
- JobStats js = (JobStats)stats.getJobGraph().getSinks().get(0);
-
+ MRJobStats js = (MRJobStats)stats.getJobGraph().getSinks().get(0);
+
Map<String, Long> entry = js.getMultiStoreCounters();
long counter = 0;
for (Long val : entry.values()) {
counter += val;
}
-
- assertEquals(groups, counter);
- }
-
- /*
+
+ assertEquals(groups, counter);
+ }
+
+ /*
* IMPORTANT NOTE:
* COMMENTED OUT BECAUSE COUNTERS DO NOT CURRENTLY WORK IN LOCAL MODE -
* SEE PIG-1286 - UNCOMMENT WHEN IT IS FIXED
- */
+ */
// @Test
// public void testLocal() throws IOException, ExecException {
// int count = 0;
@@ -568,7 +566,7 @@ public class TestCounters {
// }
// pw.close();
//
-// for(int i = 0; i < 10; i++)
+// for(int i = 0; i < 10; i++)
// if(nos[i] > 0)
// count ++;
//
@@ -582,56 +580,56 @@ public class TestCounters {
// pigServer.registerQuery("c = group b by $0;");
// pigServer.registerQuery("d = foreach c generate group, SUM(b.$1);");
// PigStats pigStats = pigServer.store("d", "file://" + out.getAbsolutePath()).getStatistics();
-// InputStream is = FileLocalizer.open(FileLocalizer.fullPath(out.getAbsolutePath(), pigServer.getPigContext()), cluster.getExecType(), pigServer.getPigContext().getDfs());
+// InputStream is = FileLocalizer.open(FileLocalizer.fullPath(out.getAbsolutePath(), pigServer.getPigContext()), ExecType.MAPREDUCE, pigServer.getPigContext().getDfs());
// long filesize = 0;
// while(is.read() != -1) filesize++;
-//
+//
// is.close();
// out.delete();
-//
+//
// //Map<String, Map<String, String>> stats = pigStats.getPigStats();
-//
+//
// assertEquals(10, pigStats.getRecordsWritten());
// assertEquals(110, pigStats.getBytesWritten());
//
// }
@Test
- public void testJoinInputCounters() throws Exception {
+ public void testJoinInputCounters() throws Exception {
testInputCounters("join");
}
-
+
@Test
- public void testCogroupInputCounters() throws Exception {
+ public void testCogroupInputCounters() throws Exception {
testInputCounters("cogroup");
}
-
+
@Test
- public void testSkewedInputCounters() throws Exception {
+ public void testSkewedInputCounters() throws Exception {
testInputCounters("skewed");
}
-
+
@Test
- public void testSelfJoinInputCounters() throws Exception {
+ public void testSelfJoinInputCounters() throws Exception {
testInputCounters("self-join");
}
-
+
private static boolean multiInputCreated = false;
-
+
private static int count = 0;
-
- private void testInputCounters(String keyword) throws Exception {
+
+ private void testInputCounters(String keyword) throws Exception {
String file1 = "multi-input1.txt";
String file2 = "multi-input2.txt";
-
+
String output = keyword;
-
+
if (keyword.equals("self-join")) {
file2 = file1;
keyword = "join";
}
-
- final int MAX_NUM_RECORDS = 100;
+
+ final int MAX_NUM_RECORDS = 100;
if (!multiInputCreated) {
PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file1));
for (int i = 0; i < MAX_NUM_RECORDS; i++) {
@@ -639,7 +637,7 @@ public class TestCounters {
pw.println(t);
}
pw.close();
-
+
PrintWriter pw2 = new PrintWriter(Util.createInputFile(cluster, file2));
for (int i = 0; i < MAX_NUM_RECORDS; i++) {
int t = r.nextInt(100);
@@ -651,8 +649,8 @@ public class TestCounters {
pw2.close();
multiInputCreated = true;
}
-
- PigServer pigServer = new PigServer(cluster.getExecType(),
+
+ PigServer pigServer = new PigServer(ExecType.MAPREDUCE,
cluster.getProperties());
pigServer.setBatchOn();
pigServer.registerQuery("a = load '" + file1 + "';");
@@ -663,7 +661,7 @@ public class TestCounters {
pigServer.registerQuery("c = join a by $0, b by $0 using 'skewed';");
}
ExecJob job = pigServer.store("c", output + "_output");
-
+
PigStats stats = job.getStatistics();
assertTrue(stats.isSuccessful());
List<InputStats> inputs = stats.getInputStats();
@@ -682,46 +680,4 @@ public class TestCounters {
}
}
}
-
- @Test
- public void testSplitUnionOutputCounters() throws Exception {
- PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
- PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, "splitunion-input"));
- for (int i = 0; i < 10; i++) {
- pw.println(i);
- }
- pw.close();
- String query =
- "a = load 'splitunion-input';" +
- "split a into b if $0 < 5, c otherwise;" +
- "d = union b, c;";
-
- pigServer.registerQuery(query);
-
- ExecJob job = pigServer.store("d", "splitunion-output-0", "PigStorage");
- PigStats stats1 = job.getStatistics();
-
- query =
- "a = load 'splitunion-input';" +
- "split a into b if $0 < 3, c if $0 > 2 and $0 < 6, d if $0 > 5;" +
- "e = distinct d;" +
- "f = union b, c, e;";
-
- pigServer.registerQuery(query);
-
- job = pigServer.store("f", "splitunion-output-1", "PigStorage");
- PigStats stats2 = job.getStatistics();
-
- PigStats[] pigStats = new PigStats[]{stats1, stats2};
- for (int i = 0; i < 2; i++) {
- PigStats stats = pigStats[i];
- assertTrue(stats.isSuccessful());
- List<OutputStats> outputs = stats.getOutputStats();
- assertEquals(1, outputs.size());
- OutputStats output = outputs.get(0);
- assertEquals("splitunion-output-" + i, output.getName());
- assertEquals(10, output.getNumberRecords());
- assertEquals(20, output.getBytes());
- }
- }
}
Modified: pig/branches/spark/test/org/apache/pig/test/TestDataBag.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestDataBag.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestDataBag.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestDataBag.java Fri Feb 24 03:34:37 2017
@@ -17,36 +17,17 @@
*/
package org.apache.pig.test;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
+import java.util.*;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.PriorityQueue;
-import java.util.Random;
-import java.util.TreeSet;
-
-import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DefaultDataBag;
-import org.apache.pig.data.DefaultTuple;
-import org.apache.pig.data.DistinctDataBag;
-import org.apache.pig.data.InternalCachedBag;
-import org.apache.pig.data.InternalDistinctBag;
-import org.apache.pig.data.InternalSortedBag;
-import org.apache.pig.data.NonSpillableDataBag;
-import org.apache.pig.data.SingleTupleBag;
-import org.apache.pig.data.SortedDataBag;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
+
+
+import org.apache.pig.data.*;
import org.apache.pig.impl.util.Spillable;
import org.junit.After;
import org.junit.Test;
@@ -55,7 +36,7 @@ import org.junit.Test;
/**
* This class will exercise the basic Pig data model and members. It tests for proper behavior in
* assignment and comparison, as well as function application.
- *
+ *
* @author dnm
*/
public class TestDataBag {
@@ -609,7 +590,7 @@ public class TestDataBag {
}
mgr.forceSpill();
}
-
+
assertEquals("Size of distinct data bag is incorrect", rightAnswer.size(), b.size());
// Read tuples back, hopefully they come out in the same order.
@@ -738,14 +719,14 @@ public class TestDataBag {
@Test
public void testDefaultBagFactory() throws Exception {
BagFactory f = BagFactory.getInstance();
-
+
DataBag bag = f.newDefaultBag();
DataBag sorted = f.newSortedBag(null);
DataBag distinct = f.newDistinctBag();
assertTrue("Expected a default bag", (bag instanceof DefaultDataBag));
assertTrue("Expected a sorted bag", (sorted instanceof SortedDataBag));
- assertTrue("Expected a distinct bag", (distinct instanceof DistinctDataBag));
+ assertTrue("Expected a distinct bag", (distinct instanceof DistinctDataBag));
}
@Test
@@ -775,7 +756,7 @@ public class TestDataBag {
try {
BagFactory f = BagFactory.getInstance();
} catch (RuntimeException re) {
- assertEquals("Expected does not extend BagFactory message",
+ assertEquals("Expected does not extend BagFactory message",
"Provided factory org.apache.pig.test.TestDataBag does not extend BagFactory!",
re.getMessage());
caughtIt = true;
@@ -794,7 +775,7 @@ public class TestDataBag {
BagFactory.resetSelf();
}
-
+
@Test
public void testNonSpillableDataBagEquals1() throws Exception {
String[][] tupleContents = new String[][] {{"a", "b"},{"c", "d" }, { "e", "f"} };
@@ -808,7 +789,7 @@ public class TestDataBag {
}
assertEquals(bg1, bg2);
}
-
+
@Test
public void testNonSpillableDataBagEquals2() throws Exception {
String[][] tupleContents = new String[][] {{"a", "b"},{"c", "d" }, { "e", "f"} };
@@ -823,7 +804,7 @@ public class TestDataBag {
}
assertEquals(bg1, bg2);
}
-
+
@Test
public void testDefaultDataBagEquals1() throws Exception {
String[][] tupleContents = new String[][] {{"a", "b"},{"c", "d" }, { "e", "f"} };
@@ -839,7 +820,7 @@ public class TestDataBag {
}
assertEquals(bg1, bg2);
}
-
+
@Test
public void testDefaultDataBagEquals2() throws Exception {
String[][] tupleContents = new String[][] {{"a", "b"},{"c", "d" }, { "e", "f"} };
@@ -856,35 +837,35 @@ public class TestDataBag {
}
assertEquals(bg1, bg2);
}
-
- public void testInternalCachedBag() throws Exception {
+
+ public void testInternalCachedBag() throws Exception {
// check adding empty tuple
DataBag bg0 = new InternalCachedBag();
bg0.add(TupleFactory.getInstance().newTuple());
bg0.add(TupleFactory.getInstance().newTuple());
assertEquals(bg0.size(), 2);
-
+
// check equal of bags
DataBag bg1 = new InternalCachedBag(1, 0.5f);
assertEquals(bg1.size(), 0);
-
+
String[][] tupleContents = new String[][] {{"a", "b"},{"c", "d" }, { "e", "f"} };
for (int i = 0; i < tupleContents.length; i++) {
bg1.add(Util.createTuple(tupleContents[i]));
}
-
+
// check size, and isSorted(), isDistinct()
assertEquals(bg1.size(), 3);
assertFalse(bg1.isSorted());
assertFalse(bg1.isDistinct());
-
+
tupleContents = new String[][] {{"c", "d" }, {"a", "b"},{ "e", "f"} };
DataBag bg2 = new InternalCachedBag(1, 0.5f);
for (int i = 0; i < tupleContents.length; i++) {
bg2.add(Util.createTuple(tupleContents[i]));
}
assertEquals(bg1, bg2);
-
+
// check bag with data written to disk
DataBag bg3 = new InternalCachedBag(1, 0.0f);
tupleContents = new String[][] {{ "e", "f"}, {"c", "d" }, {"a", "b"}};
@@ -892,7 +873,7 @@ public class TestDataBag {
bg3.add(Util.createTuple(tupleContents[i]));
}
assertEquals(bg1, bg3);
-
+
// check iterator
Iterator<Tuple> iter = bg3.iterator();
DataBag bg4 = new InternalCachedBag(1, 0.0f);
@@ -900,7 +881,7 @@ public class TestDataBag {
bg4.add(iter.next());
}
assertEquals(bg3, bg4);
-
+
// call iterator methods with irregular order
iter = bg3.iterator();
assertTrue(iter.hasNext());
@@ -913,46 +894,46 @@ public class TestDataBag {
assertFalse(iter.hasNext());
assertFalse(iter.hasNext());
assertEquals(bg3, bg5);
-
-
+
+
bg4.clear();
- assertEquals(bg4.size(), 0);
+ assertEquals(bg4.size(), 0);
}
-
- public void testInternalSortedBag() throws Exception {
-
+
+ public void testInternalSortedBag() throws Exception {
+
// check adding empty tuple
DataBag bg0 = new InternalSortedBag();
bg0.add(TupleFactory.getInstance().newTuple());
bg0.add(TupleFactory.getInstance().newTuple());
assertEquals(bg0.size(), 2);
-
+
// check equal of bags
DataBag bg1 = new InternalSortedBag();
assertEquals(bg1.size(), 0);
-
+
String[][] tupleContents = new String[][] {{ "e", "f"}, {"a", "b"}, {"c", "d" }};
for (int i = 0; i < tupleContents.length; i++) {
bg1.add(Util.createTuple(tupleContents[i]));
}
-
+
// check size, and isSorted(), isDistinct()
assertEquals(bg1.size(), 3);
assertTrue(bg1.isSorted());
assertFalse(bg1.isDistinct());
-
+
tupleContents = new String[][] {{"c", "d" }, {"a", "b"},{ "e", "f"} };
DataBag bg2 = new InternalSortedBag();
for (int i = 0; i < tupleContents.length; i++) {
bg2.add(Util.createTuple(tupleContents[i]));
}
assertEquals(bg1, bg2);
-
+
Iterator<Tuple> iter = bg1.iterator();
iter.next().equals(Util.createTuple(new String[] {"a", "b"}));
iter.next().equals(Util.createTuple(new String[] {"c", "d"}));
iter.next().equals(Util.createTuple(new String[] {"e", "f"}));
-
+
// check bag with data written to disk
DataBag bg3 = new InternalSortedBag(1, 0.0f, null);
tupleContents = new String[][] {{ "e", "f"}, {"c", "d" }, {"a", "b"}};
@@ -960,17 +941,17 @@ public class TestDataBag {
bg3.add(Util.createTuple(tupleContents[i]));
}
assertEquals(bg1, bg3);
-
+
iter = bg3.iterator();
iter.next().equals(Util.createTuple(new String[] {"a", "b"}));
iter.next().equals(Util.createTuple(new String[] {"c", "d"}));
- iter.next().equals(Util.createTuple(new String[] {"e", "f"}));
-
+ iter.next().equals(Util.createTuple(new String[] {"e", "f"}));
+
// call iterator methods with irregular order
iter = bg3.iterator();
assertTrue(iter.hasNext());
assertTrue(iter.hasNext());
-
+
DataBag bg4 = new InternalSortedBag(1, 0.0f, null);
bg4.add(iter.next());
bg4.add(iter.next());
@@ -978,21 +959,21 @@ public class TestDataBag {
bg4.add(iter.next());
assertFalse(iter.hasNext());
assertFalse(iter.hasNext());
- assertEquals(bg3, bg4);
-
+ assertEquals(bg3, bg4);
+
// check clear
bg3.clear();
assertEquals(bg3.size(), 0);
-
+
// test with all data spill out
- DataBag bg5 = new InternalSortedBag();
+ DataBag bg5 = new InternalSortedBag();
for(int j=0; j<3; j++) {
for (int i = 0; i < tupleContents.length; i++) {
bg5.add(Util.createTuple(tupleContents[i]));
- }
+ }
bg5.spill();
}
-
+
assertEquals(bg5.size(), 9);
iter = bg5.iterator();
for(int i=0; i<3; i++) {
@@ -1002,21 +983,21 @@ public class TestDataBag {
iter.next().equals(Util.createTuple(new String[] {"c", "d"}));
}
for(int i=0; i<3; i++) {
- iter.next().equals(Util.createTuple(new String[] {"e", "f"}));
+ iter.next().equals(Util.createTuple(new String[] {"e", "f"}));
}
-
+
// test with most data spill out, with some data in memory
// and merge of spill files
- DataBag bg6 = new InternalSortedBag();
+ DataBag bg6 = new InternalSortedBag();
for(int j=0; j<104; j++) {
for (int i = 0; i < tupleContents.length; i++) {
bg6.add(Util.createTuple(tupleContents[i]));
- }
+ }
if (j != 103) {
bg6.spill();
}
}
-
+
assertEquals(bg6.size(), 104*3);
iter = bg6.iterator();
for(int i=0; i<104; i++) {
@@ -1026,55 +1007,55 @@ public class TestDataBag {
iter.next().equals(Util.createTuple(new String[] {"c", "d"}));
}
for(int i=0; i<104; i++) {
- iter.next().equals(Util.createTuple(new String[] {"e", "f"}));
+ iter.next().equals(Util.createTuple(new String[] {"e", "f"}));
}
-
+
// check two implementation of sorted bag can compare correctly
- DataBag bg7 = new SortedDataBag(null);
+ DataBag bg7 = new SortedDataBag(null);
for(int j=0; j<104; j++) {
for (int i = 0; i < tupleContents.length; i++) {
bg7.add(Util.createTuple(tupleContents[i]));
- }
+ }
if (j != 103) {
bg7.spill();
}
}
assertEquals(bg6, bg7);
}
-
- public void testInternalDistinctBag() throws Exception {
+
+ public void testInternalDistinctBag() throws Exception {
// check adding empty tuple
DataBag bg0 = new InternalDistinctBag();
bg0.add(TupleFactory.getInstance().newTuple());
bg0.add(TupleFactory.getInstance().newTuple());
assertEquals(bg0.size(), 1);
-
+
// check equal of bags
DataBag bg1 = new InternalDistinctBag();
assertEquals(bg1.size(), 0);
-
+
String[][] tupleContents = new String[][] {{ "e", "f"}, {"a", "b"}, {"e", "d" }, {"a", "b"}, {"e", "f"}};
for (int i = 0; i < tupleContents.length; i++) {
bg1.add(Util.createTuple(tupleContents[i]));
}
-
+
// check size, and isSorted(), isDistinct()
assertEquals(bg1.size(), 3);
assertFalse(bg1.isSorted());
assertTrue(bg1.isDistinct());
-
+
tupleContents = new String[][] {{"a", "b" }, {"e", "d"}, {"e", "d"}, { "e", "f"} };
DataBag bg2 = new InternalDistinctBag();
for (int i = 0; i < tupleContents.length; i++) {
bg2.add(Util.createTuple(tupleContents[i]));
}
assertEquals(bg1, bg2);
-
+
Iterator<Tuple> iter = bg1.iterator();
iter.next().equals(Util.createTuple(new String[] {"a", "b"}));
iter.next().equals(Util.createTuple(new String[] {"c", "d"}));
iter.next().equals(Util.createTuple(new String[] {"e", "f"}));
-
+
// check bag with data written to disk
DataBag bg3 = new InternalDistinctBag(1, 0.0f);
tupleContents = new String[][] {{ "e", "f"}, {"a", "b"}, {"e", "d" }, {"a", "b"}, {"e", "f"}};
@@ -1083,13 +1064,13 @@ public class TestDataBag {
}
assertEquals(bg2, bg3);
assertEquals(bg3.size(), 3);
-
-
+
+
// call iterator methods with irregular order
iter = bg3.iterator();
assertTrue(iter.hasNext());
assertTrue(iter.hasNext());
-
+
DataBag bg4 = new InternalDistinctBag(1, 0.0f);
bg4.add(iter.next());
bg4.add(iter.next());
@@ -1097,73 +1078,73 @@ public class TestDataBag {
bg4.add(iter.next());
assertFalse(iter.hasNext());
assertFalse(iter.hasNext());
- assertEquals(bg3, bg4);
-
+ assertEquals(bg3, bg4);
+
// check clear
bg3.clear();
assertEquals(bg3.size(), 0);
-
+
// test with all data spill out
- DataBag bg5 = new InternalDistinctBag();
+ DataBag bg5 = new InternalDistinctBag();
for(int j=0; j<3; j++) {
for (int i = 0; i < tupleContents.length; i++) {
bg5.add(Util.createTuple(tupleContents[i]));
- }
+ }
bg5.spill();
}
-
+
assertEquals(bg5.size(), 3);
-
-
+
+
// test with most data spill out, with some data in memory
// and merge of spill files
- DataBag bg6 = new InternalDistinctBag();
+ DataBag bg6 = new InternalDistinctBag();
for(int j=0; j<104; j++) {
for (int i = 0; i < tupleContents.length; i++) {
bg6.add(Util.createTuple(tupleContents[i]));
- }
+ }
if (j != 103) {
bg6.spill();
}
}
-
- assertEquals(bg6.size(), 3);
-
+
+ assertEquals(bg6.size(), 3);
+
// check two implementation of sorted bag can compare correctly
- DataBag bg7 = new DistinctDataBag();
+ DataBag bg7 = new DistinctDataBag();
for(int j=0; j<104; j++) {
for (int i = 0; i < tupleContents.length; i++) {
bg7.add(Util.createTuple(tupleContents[i]));
- }
+ }
if (j != 103) {
bg7.spill();
}
}
assertEquals(bg6, bg7);
}
-
+
// See PIG-1231
@Test
public void testDataBagIterIdempotent() throws Exception {
DataBag bg0 = new DefaultDataBag();
processDataBag(bg0, true);
-
+
DataBag bg1 = new DistinctDataBag();
processDataBag(bg1, true);
-
+
DataBag bg2 = new InternalDistinctBag();
processDataBag(bg2, true);
-
+
DataBag bg3 = new InternalSortedBag();
processDataBag(bg3, true);
-
+
DataBag bg4 = new SortedDataBag(null);
processDataBag(bg4, true);
-
+
DataBag bg5 = new InternalCachedBag(0, 0);
processDataBag(bg5, false);
}
-
+
// See PIG-1285
@Test
public void testSerializeSingleTupleBag() throws Exception {
@@ -1178,7 +1159,7 @@ public class TestDataBag {
dfBag.readFields(dis);
assertTrue(dfBag.equals(stBag));
}
-
+
// See PIG-2550
static class MyCustomTuple extends DefaultTuple {
private static final long serialVersionUID = 8156382697467819543L;
@@ -1203,23 +1184,7 @@ public class TestDataBag {
Tuple t2 = iter.next();
assertTrue(t2.equals(t));
}
-
- // See PIG-4260
- @Test
- public void testSpillArrayBackedList() throws Exception {
- Tuple[] tuples = new Tuple[2];
- tuples[0] = TupleFactory.getInstance().newTuple(1);
- tuples[0].set(0, "first");
- tuples[1] = TupleFactory.getInstance().newTuple(1);
- tuples[1].set(0, "second");
- DefaultDataBag bag = new DefaultDataBag(Arrays.asList(tuples));
- bag.spill();
- Iterator<Tuple> iter = bag.iterator();
- assertEquals(tuples[0], iter.next());
- assertEquals(tuples[1], iter.next());
- assertFalse(iter.hasNext());
- }
-
+
void processDataBag(DataBag bg, boolean doSpill) {
Tuple t = TupleFactory.getInstance().newTuple(new Integer(0));
bg.add(t);
@@ -1229,7 +1194,7 @@ public class TestDataBag {
assertTrue(iter.hasNext());
iter.next();
assertFalse(iter.hasNext());
- assertFalse("hasNext should be idempotent", iter.hasNext());
+ assertFalse("hasNext should be idempotent", iter.hasNext());
}
}
Modified: pig/branches/spark/test/org/apache/pig/test/TestDivide.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestDivide.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestDivide.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestDivide.java Fri Feb 24 03:34:37 2017
@@ -20,9 +20,6 @@ package org.apache.pig.test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
-import java.math.BigDecimal;
-import java.math.MathContext;
-import java.math.RoundingMode;
import java.util.Map;
import java.util.Random;
@@ -56,7 +53,7 @@ public class TestDivide {
public void testOperator() throws ExecException {
// int TRIALS = 10;
byte[] types = { DataType.BAG, DataType.BOOLEAN, DataType.BYTEARRAY, DataType.CHARARRAY,
- DataType.DOUBLE, DataType.FLOAT, DataType.INTEGER, DataType.LONG, DataType.BIGDECIMAL,
+ DataType.DOUBLE, DataType.FLOAT, DataType.INTEGER, DataType.LONG,
DataType.DATETIME, DataType.MAP, DataType.TUPLE };
// Map<Byte,String> map = GenRandomData.genTypeToNameMap();
System.out.println("Testing DIVIDE operator");
@@ -253,33 +250,6 @@ public class TestDivide {
assertEquals(null, (Long)resl.result);
break;
}
- case DataType.BIGDECIMAL: {
- MathContext mc = new MathContext(Divide.BIGDECIMAL_MINIMAL_SCALE, RoundingMode.HALF_UP);
- BigDecimal inpf1 = new BigDecimal(r.nextDouble(),mc);
- BigDecimal inpf2 = new BigDecimal(r.nextDouble(),mc);
- lt.setValue(inpf1);
- rt.setValue(inpf2);
- Result resf = op.getNextBigDecimal();
- BigDecimal expected = inpf1.divide(inpf2, 2 * Divide.BIGDECIMAL_MINIMAL_SCALE + 1, RoundingMode.HALF_UP);
- assertEquals(expected, (BigDecimal)resf.result);
-
- // test with null in lhs
- lt.setValue(null);
- rt.setValue(inpf2);
- resf = op.getNextBigDecimal();
- assertEquals(null, (BigDecimal)resf.result);
- // test with null in rhs
- lt.setValue(inpf1);
- rt.setValue(null);
- resf = op.getNextBigDecimal();
- assertEquals(null, (BigDecimal)resf.result);
- // test divide by 0
- lt.setValue(inpf1);
- rt.setValue(new BigDecimal(0.0f,mc));
- resf = op.getNextBigDecimal();
- assertEquals(null, (BigDecimal)resf.result);
- break;
- }
case DataType.DATETIME:
DateTime inpdt1 = new DateTime(r.nextLong());
DateTime inpdt2 = new DateTime(r.nextLong());
Modified: pig/branches/spark/test/org/apache/pig/test/TestEmptyInputDir.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestEmptyInputDir.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestEmptyInputDir.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestEmptyInputDir.java Fri Feb 24 03:34:37 2017
@@ -23,13 +23,13 @@ import static org.junit.Assert.assertTru
import java.io.File;
import java.io.FileWriter;
-import java.io.IOException;
import java.io.PrintWriter;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.pig.PigRunner;
+import org.apache.pig.tools.pigstats.JobStats;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
import org.junit.AfterClass;
@@ -38,15 +38,16 @@ import org.junit.Test;
public class TestEmptyInputDir {
- private static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
+ private static MiniCluster cluster;
private static final String EMPTY_DIR = "emptydir";
private static final String INPUT_FILE = "input";
private static final String OUTPUT_FILE = "output";
private static final String PIG_FILE = "test.pig";
-
+
@BeforeClass
public static void setUpBeforeClass() throws Exception {
+ cluster = MiniCluster.buildCluster();
FileSystem fs = cluster.getFileSystem();
if (!fs.mkdirs(new Path(EMPTY_DIR))) {
throw new Exception("failed to create empty dir");
@@ -63,35 +64,7 @@ public class TestEmptyInputDir {
public static void tearDownAfterClass() throws Exception {
cluster.shutDown();
}
-
- @Test
- public void testGroupBy() throws Exception {
- PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
- w.println("A = load '" + EMPTY_DIR + "';");
- w.println("B = group A by $0;");
- w.println("store B into '" + OUTPUT_FILE + "';");
- w.close();
-
- try {
- String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
- PigStats stats = PigRunner.run(args, null);
-
- assertTrue(stats.isSuccessful());
-
- // This assert fails on 205 due to MAPREDUCE-3606
- if (Util.isMapredExecType(cluster.getExecType())
- && !Util.isHadoop205() && !Util.isHadoop1_x()) {
- MRJobStats js = (MRJobStats) stats.getJobGraph().getSources().get(0);
- assertEquals(0, js.getNumberMaps());
- }
-
- assertEmptyOutputFile();
- } finally {
- new File(PIG_FILE).delete();
- Util.deleteFile(cluster, OUTPUT_FILE);
- }
- }
-
+
@Test
public void testSkewedJoin() throws Exception {
PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
@@ -100,28 +73,31 @@ public class TestEmptyInputDir {
w.println("C = join B by $0, A by $0 using 'skewed';");
w.println("store C into '" + OUTPUT_FILE + "';");
w.close();
-
+
try {
- String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
+ String[] args = { PIG_FILE };
PigStats stats = PigRunner.run(args, null);
-
+
assertTrue(stats.isSuccessful());
-
+ // the sampler job has zero maps
+ MRJobStats js = (MRJobStats)stats.getJobGraph().getSources().get(0);
+
// This assert fails on 205 due to MAPREDUCE-3606
- if (Util.isMapredExecType(cluster.getExecType())
- && !Util.isHadoop205() && !Util.isHadoop1_x()) {
- // the sampler job has zero maps
- MRJobStats js = (MRJobStats) stats.getJobGraph().getSources().get(0);
- assertEquals(0, js.getNumberMaps());
- }
-
- assertEmptyOutputFile();
+ if (!Util.isHadoop205()&&!Util.isHadoop1_x())
+ assertEquals(0, js.getNumberMaps());
+
+ FileSystem fs = cluster.getFileSystem();
+ FileStatus status = fs.getFileStatus(new Path(OUTPUT_FILE));
+ assertTrue(status.isDir());
+ assertEquals(0, status.getLen());
+ // output directory isn't empty
+ assertTrue(fs.listStatus(status.getPath()).length > 0);
} finally {
new File(PIG_FILE).delete();
Util.deleteFile(cluster, OUTPUT_FILE);
}
}
-
+
@Test
public void testMergeJoin() throws Exception {
PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
@@ -130,28 +106,32 @@ public class TestEmptyInputDir {
w.println("C = join A by $0, B by $0 using 'merge';");
w.println("store C into '" + OUTPUT_FILE + "';");
w.close();
-
+
try {
- String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
+ String[] args = { PIG_FILE };
PigStats stats = PigRunner.run(args, null);
-
- assertTrue(stats.isSuccessful());
-
+
+ assertTrue(stats.isSuccessful());
+ // the indexer job has zero maps
+ MRJobStats js = (MRJobStats)stats.getJobGraph().getSources().get(0);
+
// This assert fails on 205 due to MAPREDUCE-3606
- if (Util.isMapredExecType(cluster.getExecType())
- && !Util.isHadoop205() && !Util.isHadoop1_x()) {
- // the indexer job has zero maps
- MRJobStats js = (MRJobStats) stats.getJobGraph().getSources().get(0);
- assertEquals(0, js.getNumberMaps());
- }
-
- assertEmptyOutputFile();
+ if (!Util.isHadoop205()&&!Util.isHadoop1_x())
+ assertEquals(0, js.getNumberMaps());
+
+ FileSystem fs = cluster.getFileSystem();
+ FileStatus status = fs.getFileStatus(new Path(OUTPUT_FILE));
+ assertTrue(status.isDir());
+ assertEquals(0, status.getLen());
+
+ // output directory isn't empty
+ assertTrue(fs.listStatus(status.getPath()).length > 0);
} finally {
new File(PIG_FILE).delete();
Util.deleteFile(cluster, OUTPUT_FILE);
}
}
-
+
@Test
public void testFRJoin() throws Exception {
PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
@@ -160,44 +140,55 @@ public class TestEmptyInputDir {
w.println("C = join A by $0, B by $0 using 'repl';");
w.println("store C into '" + OUTPUT_FILE + "';");
w.close();
-
+
try {
- String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
+ String[] args = { PIG_FILE };
PigStats stats = PigRunner.run(args, null);
-
- assertTrue(stats.isSuccessful());
-
+
+ assertTrue(stats.isSuccessful());
+ // the indexer job has zero maps
+ MRJobStats js = (MRJobStats)stats.getJobGraph().getSources().get(0);
+
// This assert fails on 205 due to MAPREDUCE-3606
- if (Util.isMapredExecType(cluster.getExecType())
- && !Util.isHadoop205() && !Util.isHadoop1_x()) {
- MRJobStats js = (MRJobStats) stats.getJobGraph().getSources().get(0);
- assertEquals(0, js.getNumberMaps());
- }
-
- assertEmptyOutputFile();
+ if (!Util.isHadoop205()&&!Util.isHadoop1_x())
+ assertEquals(0, js.getNumberMaps());
+
+ FileSystem fs = cluster.getFileSystem();
+ FileStatus status = fs.getFileStatus(new Path(OUTPUT_FILE));
+ assertTrue(status.isDir());
+ assertEquals(0, status.getLen());
+
+ // output directory isn't empty
+ assertTrue(fs.listStatus(status.getPath()).length > 0);
} finally {
new File(PIG_FILE).delete();
Util.deleteFile(cluster, OUTPUT_FILE);
}
}
-
+
@Test
public void testRegularJoin() throws Exception {
PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
w.println("A = load '" + INPUT_FILE + "';");
w.println("B = load '" + EMPTY_DIR + "';");
- w.println("C = join B by $0, A by $0 PARALLEL 0;");
+ w.println("C = join B by $0, A by $0;");
w.println("store C into '" + OUTPUT_FILE + "';");
w.close();
-
+
try {
- String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
+ String[] args = { PIG_FILE };
PigStats stats = PigRunner.run(args, null);
-
- assertTrue(stats.isSuccessful());
-
- assertEmptyOutputFile();
-
+
+ assertTrue(stats.isSuccessful());
+
+ FileSystem fs = cluster.getFileSystem();
+ FileStatus status = fs.getFileStatus(new Path(OUTPUT_FILE));
+ assertTrue(status.isDir());
+ assertEquals(0, status.getLen());
+
+ // output directory isn't empty
+ assertTrue(fs.listStatus(status.getPath()).length > 0);
+
} finally {
new File(PIG_FILE).delete();
Util.deleteFile(cluster, OUTPUT_FILE);
@@ -212,19 +203,19 @@ public class TestEmptyInputDir {
w.println("C = join B by $0 right outer, A by $0;");
w.println("store C into '" + OUTPUT_FILE + "';");
w.close();
-
+
try {
- String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
+ String[] args = { PIG_FILE };
PigStats stats = PigRunner.run(args, null);
-
- assertTrue(stats.isSuccessful());
- assertEquals(2, stats.getNumberRecords(OUTPUT_FILE));
+
+ assertTrue(stats.isSuccessful());
+ assertEquals(2, stats.getNumberRecords(OUTPUT_FILE));
} finally {
new File(PIG_FILE).delete();
Util.deleteFile(cluster, OUTPUT_FILE);
}
}
-
+
@Test
public void testLeftOuterJoin() throws Exception {
PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
@@ -233,88 +224,16 @@ public class TestEmptyInputDir {
w.println("C = join B by $0 left outer, A by $0;");
w.println("store C into '" + OUTPUT_FILE + "';");
w.close();
-
- try {
- String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
- PigStats stats = PigRunner.run(args, null);
-
- assertTrue(stats.isSuccessful());
- assertEquals(0, stats.getNumberRecords(OUTPUT_FILE));
- } finally {
- new File(PIG_FILE).delete();
- Util.deleteFile(cluster, OUTPUT_FILE);
- }
- }
-
- @Test
- public void testBloomJoin() throws Exception {
- PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
- w.println("A = load '" + INPUT_FILE + "' as (x:int);");
- w.println("B = load '" + EMPTY_DIR + "' as (x:int);");
- w.println("C = join B by $0, A by $0 using 'bloom';");
- w.println("D = join A by $0, B by $0 using 'bloom';");
- w.println("store C into '" + OUTPUT_FILE + "';");
- w.println("store D into 'output1';");
- w.close();
-
- try {
- String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
- PigStats stats = PigRunner.run(args, null);
-
- assertTrue(stats.isSuccessful());
- assertEquals(0, stats.getNumberRecords(OUTPUT_FILE));
- assertEquals(0, stats.getNumberRecords("output1"));
- assertEmptyOutputFile();
- } finally {
- new File(PIG_FILE).delete();
- Util.deleteFile(cluster, OUTPUT_FILE);
- Util.deleteFile(cluster, "output1");
- }
- }
-
- @Test
- public void testBloomJoinOuter() throws Exception {
- PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
- w.println("A = load '" + INPUT_FILE + "' as (x:int);");
- w.println("B = load '" + EMPTY_DIR + "' as (x:int);");
- w.println("C = join B by $0 left outer, A by $0 using 'bloom';");
- w.println("D = join A by $0 left outer, B by $0 using 'bloom';");
- w.println("E = join B by $0 right outer, A by $0 using 'bloom';");
- w.println("F = join A by $0 right outer, B by $0 using 'bloom';");
- w.println("store C into '" + OUTPUT_FILE + "';");
- w.println("store D into 'output1';");
- w.println("store E into 'output2';");
- w.println("store F into 'output3';");
- w.close();
-
+
try {
- String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
+ String[] args = { PIG_FILE };
PigStats stats = PigRunner.run(args, null);
-
- assertTrue(stats.isSuccessful());
- assertEquals(0, stats.getNumberRecords(OUTPUT_FILE));
- assertEquals(2, stats.getNumberRecords("output1"));
- assertEquals(2, stats.getNumberRecords("output2"));
- assertEquals(0, stats.getNumberRecords("output3"));
- assertEmptyOutputFile();
+
+ assertTrue(stats.isSuccessful());
+ assertEquals(0, stats.getNumberRecords(OUTPUT_FILE));
} finally {
new File(PIG_FILE).delete();
Util.deleteFile(cluster, OUTPUT_FILE);
- Util.deleteFile(cluster, "output1");
- Util.deleteFile(cluster, "output2");
- Util.deleteFile(cluster, "output3");
}
}
-
- private void assertEmptyOutputFile() throws IllegalArgumentException, IOException {
- FileSystem fs = cluster.getFileSystem();
- FileStatus status = fs.getFileStatus(new Path(OUTPUT_FILE));
- assertTrue(status.isDir());
- assertEquals(0, status.getLen());
- // output directory isn't empty. Has one empty file
- FileStatus[] files = fs.listStatus(status.getPath(), Util.getSuccessMarkerPathFilter());
- assertEquals(1, files.length);
- assertEquals(0, files[0].getLen());
- assertTrue(files[0].getPath().getName().startsWith("part-"));
- }
}
Modified: pig/branches/spark/test/org/apache/pig/test/TestErrorHandlingStoreFunc.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestErrorHandlingStoreFunc.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestErrorHandlingStoreFunc.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestErrorHandlingStoreFunc.java Fri Feb 24 03:34:37 2017
@@ -200,11 +200,11 @@ public class TestErrorHandlingStoreFunc
private void updatePigProperties(boolean allowErrors, long minErrors,
double errorThreshold) {
Properties properties = pigServer.getPigContext().getProperties();
- properties.put(PigConfiguration.PIG_ERROR_HANDLING_ENABLED,
+ properties.put(PigConfiguration.PIG_ALLOW_STORE_ERRORS,
Boolean.toString(allowErrors));
- properties.put(PigConfiguration.PIG_ERROR_HANDLING_MIN_ERROR_RECORDS,
+ properties.put(PigConfiguration.PIG_ERRORS_MIN_RECORDS,
Long.toString(minErrors));
- properties.put(PigConfiguration.PIG_ERROR_HANDLING_THRESHOLD_PERCENT,
+ properties.put(PigConfiguration.PIG_ERROR_THRESHOLD_PERCENT,
Double.toString(errorThreshold));
}
}
Modified: pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java Fri Feb 24 03:34:37 2017
@@ -291,7 +291,7 @@ public class TestEvalPipeline {
myMap.put("long", new Long(1));
myMap.put("float", new Float(1.0));
myMap.put("double", new Double(1.0));
- myMap.put("dba", new DataByteArray(new String("1234").getBytes()));
+ myMap.put("dba", new DataByteArray(new String("bytes").getBytes()));
myMap.put("map", mapInMap);
myMap.put("tuple", tuple);
myMap.put("bag", bag);
@@ -794,31 +794,32 @@ public class TestEvalPipeline {
}
@Test
- public void testMapUDFWithImplicitTypeCast() throws Exception{
+ public void testMapUDFfail() throws Exception{
int LOOP_COUNT = 2;
File tmpFile = Util.createTempFileDelOnExit("test", "txt");
PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
for(int i = 0; i < LOOP_COUNT; i++) {
- ps.println(i);
+ for(int j=0;j<LOOP_COUNT;j+=2){
+ ps.println(i+"\t"+j);
+ ps.println(i+"\t"+j);
+ }
}
ps.close();
pigServer.registerQuery("A = LOAD '"
+ Util.generateURI(tmpFile.toString(), pigContext) + "';");
pigServer.registerQuery("B = foreach A generate " + MapUDF.class.getName() + "($0) as mymap;"); //the argument does not matter
- String query = "C = foreach B generate mymap#'dba' * 10; ";
+ String query = "C = foreach B {"
+ + "generate mymap#'dba' * 10;"
+ + "};";
pigServer.registerQuery(query);
-
- Iterator<Tuple> iter = pigServer.openIterator("C");
- if(!iter.hasNext()) Assert.fail("No output found");
- int numIdentity = 0;
- while(iter.hasNext()){
- Tuple t = iter.next();
- Assert.assertEquals(new Integer(12340), (Integer)t.get(0));
- ++numIdentity;
+ try {
+ pigServer.openIterator("C");
+ Assert.fail("Error expected.");
+ } catch (Exception e) {
+ e.getMessage().contains("Cannot determine");
}
- Assert.assertEquals(LOOP_COUNT, numIdentity);
}
@Test
Modified: pig/branches/spark/test/org/apache/pig/test/TestFindQuantiles.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestFindQuantiles.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestFindQuantiles.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestFindQuantiles.java Fri Feb 24 03:34:37 2017
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.DiscreteProbabilitySampleGenerator;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.InternalMap;
@@ -37,10 +38,10 @@ import org.apache.pig.impl.builtin.FindQ
import org.junit.Test;
public class TestFindQuantiles {
-
+
private static TupleFactory tFact = TupleFactory.getInstance();
private static final float epsilon = 0.0001f;
-
+
@Test
public void testFindQuantiles() throws Exception {
final int numSamples = 97778;
@@ -49,7 +50,7 @@ public class TestFindQuantiles {
System.out.println("sum: " + sum);
assertTrue(sum > (1-epsilon) && sum < (1+epsilon));
}
-
+
@Test
public void testFindQuantiles2() throws Exception {
final int numSamples = 30000;
@@ -85,7 +86,7 @@ public class TestFindQuantiles {
}
private float[] getProbVec(Tuple values) throws Exception {
- float[] probVec = new float[values.size()];
+ float[] probVec = new float[values.size()];
for(int i = 0; i < values.size(); i++) {
probVec[i] = (Float)values.get(i);
}
@@ -94,7 +95,7 @@ public class TestFindQuantiles {
private DataBag generateRandomSortedSamples(int numSamples, int max) throws Exception {
Random rand = new Random(1000);
- List<Tuple> samples = new ArrayList<Tuple>();
+ List<Tuple> samples = new ArrayList<Tuple>();
for (int i=0; i<numSamples; i++) {
Tuple t = tFact.newTuple(1);
t.set(0, rand.nextInt(max));
@@ -105,7 +106,7 @@ public class TestFindQuantiles {
}
private DataBag generateUniqueSamples(int numSamples) throws Exception {
- DataBag samples = BagFactory.getInstance().newDefaultBag();
+ DataBag samples = BagFactory.getInstance().newDefaultBag();
for (int i=0; i<numSamples; i++) {
Tuple t = tFact.newTuple(1);
t.set(0, new Integer(23));
@@ -120,9 +121,9 @@ public class TestFindQuantiles {
in.set(0, new Integer(numReduceres));
in.set(1, samples);
-
+
FindQuantiles fq = new FindQuantiles();
-
+
Map<String, Object> res = fq.exec(in);
return res;
}
@@ -134,11 +135,12 @@ public class TestFindQuantiles {
InternalMap weightedPartsData = (InternalMap) res.get(FindQuantiles.WEIGHTED_PARTS);
Iterator<Object> it = weightedPartsData.values().iterator();
float[] probVec = getProbVec((Tuple)it.next());
+ new DiscreteProbabilitySampleGenerator(probVec);
float sum = 0.0f;
for (float f : probVec) {
sum += f;
}
return sum;
}
-
+
}
Modified: pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java Fri Feb 24 03:34:37 2017
@@ -30,7 +30,6 @@ import java.util.List;
import java.util.Random;
import org.apache.pig.PigServer;
-import org.apache.pig.builtin.mock.Storage;
import org.apache.pig.data.Tuple;
import org.apache.pig.test.utils.TestHelper;
import org.junit.Test;
@@ -106,31 +105,6 @@ public class TestForEachNestedPlanLocal
}
@Test
- public void testNestedCrossTwoRelationsLimit() throws Exception {
- Storage.Data data = Storage.resetData(pig);
- data.set("input",
- Storage.tuple(Storage.bag(Storage.tuple(1, 1), Storage.tuple(1, 2)), Storage.bag(Storage.tuple(1, 3), Storage.tuple(1, 4))),
- Storage.tuple(Storage.bag(Storage.tuple(2, 1), Storage.tuple(2, 2)), Storage.bag(Storage.tuple(2, 3))),
- Storage.tuple(Storage.bag(Storage.tuple(3, 1)), Storage.bag(Storage.tuple(3, 2))));
-
- pig.setBatchOn();
- pig.registerQuery("A = load 'input' using mock.Storage() as (bag1:bag{tup1:tuple(f1:int, f2:int)}, bag2:bag{tup2:tuple(f3:int, f4:int)});");
- pig.registerQuery("B = foreach A {"
- + "crossed = cross bag1, bag2;"
- + "filtered = filter crossed by f1 == f3;"
- + "lmt = limit filtered 1;"
- + "generate FLATTEN(lmt);" + "}");
- pig.registerQuery("store B into 'output' using mock.Storage();");
-
- pig.executeBatch();
-
- List<Tuple> actualResults = data.get("output");
- List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
- new String[] {"(1, 1, 1, 3)", "(2, 1, 2, 3)", "(3, 1, 3, 2)"});
- Util.checkQueryOutputs(actualResults.iterator(), expectedResults);
- }
-
- @Test
public void testNestedCrossTwoRelationsComplex() throws Exception {
File[] tmpFiles = generateDataSetFilesForNestedCross();
List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStringAsByteArray(new String[] {
Modified: pig/branches/spark/test/org/apache/pig/test/TestGFCross.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestGFCross.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestGFCross.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestGFCross.java Fri Feb 24 03:34:37 2017
@@ -20,7 +20,6 @@ package org.apache.pig.test;
import static org.junit.Assert.assertEquals;
import org.apache.hadoop.conf.Configuration;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
@@ -51,7 +50,6 @@ public class TestGFCross {
public void testSerial() throws Exception {
Configuration cfg = new Configuration();
cfg.set(PigImplConstants.PIG_CROSS_PARALLELISM + ".1", "1");
- cfg.set(MRConfiguration.TASK_ID, "task_1473802673416_1808_m_000000");
UDFContext.getUDFContext().addJobConf(cfg);
Tuple t = TupleFactory.getInstance().newTuple(2);
@@ -68,7 +66,6 @@ public class TestGFCross {
public void testParallelSet() throws Exception {
Configuration cfg = new Configuration();
cfg.set(PigImplConstants.PIG_CROSS_PARALLELISM + ".1", "10");
- cfg.set(MRConfiguration.TASK_ID, "task_14738102975522_0001_r_000000");
UDFContext.getUDFContext().addJobConf(cfg);
Tuple t = TupleFactory.getInstance().newTuple(2);
Modified: pig/branches/spark/test/org/apache/pig/test/TestGrunt.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestGrunt.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestGrunt.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestGrunt.java Fri Feb 24 03:34:37 2017
@@ -28,7 +28,6 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.FileWriter;
-import java.io.FilenameFilter;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintWriter;
@@ -971,6 +970,7 @@ public class TestGrunt {
@Test
public void testStopOnFailure() throws Throwable {
+ Assume.assumeTrue("Skip this test for TEZ", Util.isMapredExecType(cluster.getExecType()));
PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties());
PigContext context = server.getPigContext();
context.getProperties().setProperty("stop.on.failure", ""+true);
@@ -1569,20 +1569,4 @@ public class TestGrunt {
}
assertTrue(found);
}
-
- @Test
- public void testGruntUtf8() throws Throwable {
- String command = "mkdir 测试\n" +
- "quit\n";
- System.setProperty("jline.WindowsTerminal.directConsole", "false");
- System.setIn(new ByteArrayInputStream(command.getBytes()));
- org.apache.pig.PigRunner.run(new String[] {"-x", "local"}, null);
- File[] partFiles = new File(".").listFiles(new FilenameFilter() {
- public boolean accept(File dir, String name) {
- return name.equals("测试");
- }
- });
- assertEquals(partFiles.length, 1);
- new File("测试").delete();
- }
}
Modified: pig/branches/spark/test/org/apache/pig/test/TestHBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestHBaseStorage.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestHBaseStorage.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestHBaseStorage.java Fri Feb 24 03:34:37 2017
@@ -71,16 +71,12 @@ public class TestHBaseStorage {
private static final String TESTTABLE_1 = "pigtable_1";
private static final String TESTTABLE_2 = "pigtable_2";
private static final byte[] COLUMNFAMILY = Bytes.toBytes("pig");
- private static final byte[] COLUMNFAMILY2 = Bytes.toBytes("pig2");
private static final String TESTCOLUMN_A = "pig:col_a";
private static final String TESTCOLUMN_B = "pig:col_b";
private static final String TESTCOLUMN_C = "pig:col_c";
private static final int TEST_ROW_COUNT = 100;
- private enum TableType {ONE_CF, TWO_CF};
- private TableType lastTableType;
-
@BeforeClass
public static void setUp() throws Exception {
// This is needed by Pig
@@ -317,13 +313,13 @@ public class TestHBaseStorage {
*/
@Test
public void testLoadWithMap_3_col_prefix() throws IOException {
- prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText, TableType.TWO_CF);
+ prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);
pig.registerQuery("a = load 'hbase://"
+ TESTTABLE_1
+ "' using "
+ "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
- + "pig2:* pig:prefixed_col_*"
+ + "pig:col_* pig:prefixed_col_*"
+ "','-loadKey') as (rowKey:chararray, pig_cf_map:map[], pig_prefix_cf_map:map[]);");
Iterator<Tuple> it = pig.openIterator("a");
int count = 0;
@@ -332,18 +328,24 @@ public class TestHBaseStorage {
Tuple t = it.next();
LOG.info("LoadFromHBase " + t);
String rowKey = t.get(0).toString();
- Map pig_secondery_cf_map = (Map) t.get(1);
+ Map pig_cf_map = (Map) t.get(1);
Map pig_prefix_cf_map = (Map) t.get(2);
Assert.assertEquals(3, t.size());
Assert.assertEquals("00".substring((count + "").length()) + count,
rowKey);
- Assert.assertEquals(count,
- Integer.parseInt(pig_secondery_cf_map.get("col_x").toString()));
Assert.assertEquals("PrefixedText_" + count,
((DataByteArray) pig_prefix_cf_map.get("prefixed_col_d")).toString());
Assert.assertEquals(1, pig_prefix_cf_map.size());
+ Assert.assertEquals(count,
+ Integer.parseInt(pig_cf_map.get("col_a").toString()));
+ Assert.assertEquals(count + 0.0,
+ Double.parseDouble(pig_cf_map.get("col_b").toString()), 1e-6);
+ Assert.assertEquals("Text_" + count,
+ ((DataByteArray) pig_cf_map.get("col_c")).toString());
+ Assert.assertEquals(3, pig_cf_map.size());
+
count++;
}
Assert.assertEquals(TEST_ROW_COUNT, count);
@@ -432,39 +434,6 @@ public class TestHBaseStorage {
LOG.info("LoadFromHBase done");
}
- public void testLoadWithFixedAndPrefixedCols3() throws IOException {
- prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);
-
- pig.registerQuery("a = load 'hbase://"
- + TESTTABLE_1
- + "' using "
- + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
- + "pig:* pig:prefixed_col_*"
- + "','-loadKey') as (rowKey:chararray, pig_cf_map:map[], pig_prefix_cf_map:map[]);");
- Iterator<Tuple> it = pig.openIterator("a");
- int count = 0;
- LOG.info("LoadFromHBase Starting");
- while (it.hasNext()) {
- Tuple t = it.next();
- LOG.info("LoadFromHBase " + t);
- String rowKey = (String) t.get(0);
- Map pig_cf_map = (Map) t.get(1);
- Map pig_prefix_cf_map = (Map) t.get(2);
- Assert.assertEquals(3, t.size());
-
- Assert.assertEquals("00".substring((count + "").length()) + count,
- rowKey);
- Assert.assertEquals("PrefixedText_" + count,
- ((DataByteArray) pig_cf_map.get("prefixed_col_d")).toString());
- Assert.assertEquals(1, pig_cf_map.size());
- Assert.assertEquals(1, pig_prefix_cf_map.size());
-
- count++;
- }
- Assert.assertEquals(TEST_ROW_COUNT, count);
- LOG.info("LoadFromHBase done");
- }
-
/**
* * Test Load from hbase with map parameters and with a
* static column in different order
@@ -1517,36 +1486,22 @@ public class TestHBaseStorage {
+ "') as (rowKey:chararray,col_a:int, col_b:double, col_c:chararray);");
}
- private HTable prepareTable(String tableName, boolean initData,
- DataFormat format) throws IOException {
- return prepareTable(tableName, initData, format, TableType.ONE_CF);
- }
/**
* Prepare a table in hbase for testing.
*
*/
private HTable prepareTable(String tableName, boolean initData,
- DataFormat format, TableType type) throws IOException {
+ DataFormat format) throws IOException {
// define the table schema
HTable table = null;
try {
- if (lastTableType == type) {
- deleteAllRows(tableName);
- } else {
- util.deleteTable(tableName);
- }
+ deleteAllRows(tableName);
} catch (Exception e) {
// It's ok, table might not exist.
}
try {
- if (type == TableType.TWO_CF) {
- table = util.createTable(Bytes.toBytesBinary(tableName),
- new byte[][]{COLUMNFAMILY, COLUMNFAMILY2});
- } else {
- table = util.createTable(Bytes.toBytesBinary(tableName),
- COLUMNFAMILY);
- }
- lastTableType = type;
+ table = util.createTable(Bytes.toBytesBinary(tableName),
+ COLUMNFAMILY);
} catch (Exception e) {
table = new HTable(conf, Bytes.toBytesBinary(tableName));
}
@@ -1573,11 +1528,6 @@ public class TestHBaseStorage {
// prefixed_col_d: string type
put.add(COLUMNFAMILY, Bytes.toBytes("prefixed_col_d"),
Bytes.toBytes("PrefixedText_" + i));
- // another cf
- if (type == TableType.TWO_CF) {
- put.add(COLUMNFAMILY2, Bytes.toBytes("col_x"),
- Bytes.toBytes(i));
- }
table.put(put);
} else {
// row key: string type
@@ -1598,11 +1548,6 @@ public class TestHBaseStorage {
// prefixed_col_d: string type
put.add(COLUMNFAMILY, Bytes.toBytes("prefixed_col_d"),
("PrefixedText_" + i).getBytes());
- // another cf
- if (type == TableType.TWO_CF) {
- put.add(COLUMNFAMILY2, Bytes.toBytes("col_x"),
- (i + "").getBytes());
- }
table.put(put);
}
}
Modified: pig/branches/spark/test/org/apache/pig/test/TestJobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestJobControlCompiler.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestJobControlCompiler.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestJobControlCompiler.java Fri Feb 24 03:34:37 2017
@@ -63,6 +63,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileSpec;
@@ -130,7 +131,7 @@ public class TestJobControlCompiler {
// verifying the jar gets on distributed cache
Path[] fileClassPaths = DistributedCache.getFileClassPaths(jobConf);
// guava jar is not shipped with Hadoop 2.x
- Assert.assertEquals("size for "+Arrays.toString(fileClassPaths), 5, fileClassPaths.length);
+ Assert.assertEquals("size for "+Arrays.toString(fileClassPaths), HadoopShims.isHadoopYARN() ? 5 : 6, fileClassPaths.length);
Path distributedCachePath = fileClassPaths[0];
Assert.assertEquals("ends with jar name: "+distributedCachePath, distributedCachePath.getName(), tmpFile.getName());
// hadoop bug requires path to not contain hdfs://hotname in front
@@ -234,12 +235,22 @@ public class TestJobControlCompiler {
// 4. another.jar and 5. udf1.jar, and not duplicate udf.jar
System.out.println("cache.files= " + Arrays.toString(cacheURIs));
System.out.println("classpath.files= " + Arrays.toString(fileClassPaths));
- // Default jars - 5 (pig, antlr, joda-time, automaton)
- // Other jars - 10 (udf.jar#udf.jar, udf1.jar#diffname.jar, udf2.jar, udf1.jar, another.jar
- Assert.assertEquals("size 9 for " + Arrays.toString(cacheURIs), 9,
- Arrays.asList(StringUtils.join(cacheURIs, ",").split(",")).size());
- Assert.assertEquals("size 9 for " + Arrays.toString(fileClassPaths), 9,
- Arrays.asList(StringUtils.join(fileClassPaths, ",").split(",")).size());
+ if (HadoopShims.isHadoopYARN()) {
+ // Default jars - 5 (pig, antlr, joda-time, automaton)
+ // Other jars - 10 (udf.jar#udf.jar, udf1.jar#diffname.jar, udf2.jar, udf1.jar, another.jar
+ Assert.assertEquals("size 9 for " + Arrays.toString(cacheURIs), 9,
+ Arrays.asList(StringUtils.join(cacheURIs, ",").split(",")).size());
+ Assert.assertEquals("size 9 for " + Arrays.toString(fileClassPaths), 9,
+ Arrays.asList(StringUtils.join(fileClassPaths, ",").split(",")).size());
+ } else {
+ // Default jars - 5. Has guava in addition
+ // There will be same entries duplicated for udf.jar and udf2.jar
+ Assert.assertEquals("size 12 for " + Arrays.toString(cacheURIs), 12,
+ Arrays.asList(StringUtils.join(cacheURIs, ",").split(",")).size());
+ Assert.assertEquals("size 12 for " + Arrays.toString(fileClassPaths), 12,
+ Arrays.asList(StringUtils.join(fileClassPaths, ",").split(",")).size());
+ }
+
// Count occurrences of the resources
Map<String, Integer> occurrences = new HashMap<String, Integer>();
@@ -248,12 +259,22 @@ public class TestJobControlCompiler {
val = (val == null) ? 1 : ++val;
occurrences.put(cacheURI.toString(), val);
}
- Assert.assertEquals(9, occurrences.size());
+ if (HadoopShims.isHadoopYARN()) {
+ Assert.assertEquals(9, occurrences.size());
+ } else {
+ Assert.assertEquals(10, occurrences.size()); //guava jar in addition
+ }
for (String file : occurrences.keySet()) {
- // check that only single occurrence even though we added once to dist cache (simulating via Oozie)
- // and second time through pig register jar when there is symlink
- Assert.assertEquals("One occurrence for " + file, 1, (int) occurrences.get(file));
+ if (!HadoopShims.isHadoopYARN() && (file.endsWith("udf.jar") || file.endsWith("udf2.jar"))) {
+ // Same path added twice which is ok. It should not be a shipped to hdfs temp path.
+ // We assert path is same by checking count
+ Assert.assertEquals("Two occurrences for " + file, 2, (int) occurrences.get(file));
+ } else {
+ // check that only single occurrence even though we added once to dist cache (simulating via Oozie)
+ // and second time through pig register jar when there is symlink
+ Assert.assertEquals("One occurrence for " + file, 1, (int) occurrences.get(file));
+ }
}
}