You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datafu.apache.org by mh...@apache.org on 2014/05/26 04:52:20 UTC
[1/2] git commit: DATAFU-10 Upgrade to Pig 0.12.1
Repository: incubator-datafu
Updated Branches:
refs/heads/master c4149151e -> 412b14bc9
DATAFU-10 Upgrade to Pig 0.12.1
https://issues.apache.org/jira/browse/DATAFU-10
Project: http://git-wip-us.apache.org/repos/asf/incubator-datafu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-datafu/commit/412b14bc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-datafu/tree/412b14bc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-datafu/diff/412b14bc
Branch: refs/heads/master
Commit: 412b14bc9169a8f4f7433f02551cbbf698846f20
Parents: faab884
Author: Matt Hayes <mh...@linkedin.com>
Authored: Mon May 12 14:08:02 2014 -0700
Committer: Matt Hayes <mh...@linkedin.com>
Committed: Sun May 25 18:23:10 2014 -0700
----------------------------------------------------------------------
.../datafu/test/pig/hash/lsh/LSHPigTest.java | 2 +-
.../datafu/test/pig/sampling/SamplingTests.java | 166 +++++++++----------
.../WeightedReservoirSamplingTests.java | 146 ++++++++--------
gradle/dependency-versions.gradle | 2 +-
4 files changed, 158 insertions(+), 158 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/412b14bc/datafu-pig/src/test/java/datafu/test/pig/hash/lsh/LSHPigTest.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/test/java/datafu/test/pig/hash/lsh/LSHPigTest.java b/datafu-pig/src/test/java/datafu/test/pig/hash/lsh/LSHPigTest.java
index a35c4f2..ac3e409 100644
--- a/datafu-pig/src/test/java/datafu/test/pig/hash/lsh/LSHPigTest.java
+++ b/datafu-pig/src/test/java/datafu/test/pig/hash/lsh/LSHPigTest.java
@@ -168,7 +168,7 @@ public class LSHPigTest extends PigTests
describe NEAR_NEIGHBORS;
NEIGHBORS_PROJ = foreach NEAR_NEIGHBORS {
- generate query_pt as query_pt, neighbor.pt as matching_pts;
+ generate TOTUPLE(query_pt) as query_pt, neighbor.pt as matching_pts;
};
describe NEIGHBORS_PROJ;
NOT_NULL = filter NEIGHBORS_PROJ by SIZE(matching_pts) > 0;
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/412b14bc/datafu-pig/src/test/java/datafu/test/pig/sampling/SamplingTests.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/test/java/datafu/test/pig/sampling/SamplingTests.java b/datafu-pig/src/test/java/datafu/test/pig/sampling/SamplingTests.java
index 2f685d4..418a694 100644
--- a/datafu-pig/src/test/java/datafu/test/pig/sampling/SamplingTests.java
+++ b/datafu-pig/src/test/java/datafu/test/pig/sampling/SamplingTests.java
@@ -47,69 +47,69 @@ import datafu.test.pig.PigTests;
public class SamplingTests extends PigTests
{
/**
-
+
define WeightedSample datafu.pig.sampling.WeightedSample('1');
-
+
data = LOAD 'input' AS (A: bag {T: tuple(v1:chararray,v2:INT)});
-
+
data2 = FOREACH data GENERATE WeightedSample(A,1);
--describe data2;
-
+
STORE data2 INTO 'output';
*/
@Multiline
private String weightedSampleTest;
-
+
@Test
public void weightedSampleTest() throws Exception
{
PigTest test = createPigTestFromString(weightedSampleTest);
- writeLinesToFile("input",
+ writeLinesToFile("input",
"({(a, 100),(b, 1),(c, 5),(d, 2)})");
-
+
test.runScript();
-
+
assertOutput(test, "data2",
"({(a,100),(c,5),(b,1),(d,2)})");
}
-
+
/**
-
+
define WeightedSample datafu.pig.sampling.WeightedSample('1');
-
+
data = LOAD 'input' AS (A: bag {T: tuple(v1:chararray,v2:INT)});
-
+
data2 = FOREACH data GENERATE WeightedSample(A,1,3);
--describe data2;
-
+
STORE data2 INTO 'output';
*/
@Multiline
private String weightedSampleLimitTest;
-
+
@Test
public void weightedSampleLimitTest() throws Exception
{
PigTest test = createPigTestFromString(weightedSampleLimitTest);
- writeLinesToFile("input",
+ writeLinesToFile("input",
"({(a, 100),(b, 1),(c, 5),(d, 2)})");
-
+
test.runScript();
-
+
assertOutput(test, "data2",
"({(a,100),(c,5),(b,1)})");
}
-
+
@Test
public void weightedSampleLimitExecTest() throws IOException
{
WeightedSample sampler = new WeightedSample();
-
+
DataBag bag = BagFactory.getInstance().newDefaultBag();
for (int i=0; i<100; i++)
{
@@ -118,16 +118,16 @@ public class SamplingTests extends PigTests
t.set(1, 1); // score is equal for all
bag.add(t);
}
-
+
Tuple input = TupleFactory.getInstance().newTuple(3);
input.set(0, bag);
input.set(1, 1); // use index 1 for score
input.set(2, 10); // get 10 items
-
+
DataBag result = sampler.exec(input);
-
+
Assert.assertEquals(10, result.size());
-
+
// all must be found, no repeats
Set<Integer> found = new HashSet<Integer>();
for (Tuple t : result)
@@ -139,26 +139,26 @@ public class SamplingTests extends PigTests
found.add(i);
}
}
-
+
/**
-
-
+
+
DEFINE SampleByKey datafu.pig.sampling.SampleByKey('0.5', 'salt2.5');
-
+
data = LOAD 'input' AS (A_id:chararray, B_id:chararray, C:int);
sampled = FILTER data BY SampleByKey(A_id);
-
+
STORE sampled INTO 'output';
*/
@Multiline
private String sampleByKeyTest;
-
+
@Test
public void sampleByKeyTest() throws Exception
{
PigTest test = createPigTestFromString(sampleByKeyTest);
-
+
writeLinesToFile("input",
"A1\tB1\t1","A1\tB1\t4","A1\tB3\t4","A1\tB4\t4",
"A2\tB1\t4","A2\tB2\t4",
@@ -170,9 +170,9 @@ public class SamplingTests extends PigTests
"A8\tB1\t4","A8\tB2\t45",
"A9\tB3\t92", "A9\tB1\t42","A9\tB2\t1","A9\tB3\t0",
"A10\tB1\t7","A10\tB2\t23","A10\tB3\t1","A10\tB4\t41","A10\tB5\t52");
-
+
test.runScript();
- assertOutput(test, "sampled",
+ assertOutput(test, "sampled",
"(A4,B1,3)","(A4,B2,3)","(A4,B3,59)","(A4,B4,29)",
"(A5,B1,4)",
"(A6,B2,3)","(A6,B2,55)","(A6,B3,1)",
@@ -181,24 +181,24 @@ public class SamplingTests extends PigTests
}
/**
-
-
+
+
DEFINE SampleByKey datafu.pig.sampling.SampleByKey('0.5', 'salt2.5');
-
+
data = LOAD 'input' AS (A_id:chararray, B_id:chararray, C:int);
sampled = FILTER data BY SampleByKey(A_id, B_id);
-
+
STORE sampled INTO 'output';
*/
@Multiline
private String sampleByKeyMultipleKeyTest;
-
+
@Test
public void sampleByKeyMultipleKeyTest() throws Exception
{
PigTest test = createPigTestFromString(sampleByKeyMultipleKeyTest);
-
+
writeLinesToFile("input",
"A1\tB1\t1","A1\tB1\t4",
"A1\tB3\t4",
@@ -226,7 +226,7 @@ public class SamplingTests extends PigTests
"A10\tB6\t41",
"A10\tB7\t52");
test.runScript();
- assertOutput(test, "sampled",
+ assertOutput(test, "sampled",
"(A1,B1,1)","(A1,B1,4)",
"(A1,B4,4)",
"(A2,B1,4)",
@@ -240,16 +240,16 @@ public class SamplingTests extends PigTests
"(A9,B3,92)","(A9,B3,0)",
"(A10,B2,23)","(A10,B2,1)","(A10,B2,31)"
);
-
+
}
-
+
@Test
public void sampleByKeyExecTest() throws Exception
{
SampleByKey sampler = new SampleByKey("0.10", "thesalt");
-
+
Map<Integer,Integer> valuesPerKey = new HashMap<Integer,Integer>();
-
+
// 10,000 keys total
for (int i=0; i<10000; i++)
{
@@ -259,7 +259,7 @@ public class SamplingTests extends PigTests
Tuple t = TupleFactory.getInstance().newTuple(1);
t.set(0, i);
if (sampler.exec(t))
- {
+ {
if (valuesPerKey.containsKey(i))
{
valuesPerKey.put(i, valuesPerKey.get(i)+1);
@@ -271,22 +271,22 @@ public class SamplingTests extends PigTests
}
}
}
-
+
// 10% sample, so should have roughly 1000 keys
Assert.assertTrue(Math.abs(1000-valuesPerKey.size()) < 50);
-
+
// every value should be present for the same key
for (Map.Entry<Integer, Integer> pair : valuesPerKey.entrySet())
{
Assert.assertEquals(5, pair.getValue().intValue());
}
}
-
+
/**
-
+
DEFINE ReservoirSample datafu.pig.sampling.ReservoirSample('$RESERVOIR_SIZE');
-
+
data = LOAD 'input' AS (A_id:chararray, B_id:chararray, C:int);
sampled = FOREACH (GROUP data ALL) GENERATE ReservoirSample(data) as sample_data;
sampled = FOREACH sampled GENERATE COUNT(sample_data) AS sample_count;
@@ -295,11 +295,11 @@ public class SamplingTests extends PigTests
*/
@Multiline
private String reservoirSampleTest;
-
+
@Test
public void reservoirSampleTest() throws Exception
{
-
+
writeLinesToFile("input",
"A1\tB1\t1",
"A1\tB1\t4",
@@ -333,7 +333,7 @@ public class SamplingTests extends PigTests
"A10\tB2\t31",
"A10\tB6\t41",
"A10\tB7\t52");
-
+
for(int i=10; i<=30; i=i+10){
int reservoirSize = i ;
PigTest test = createPigTestFromString(reservoirSampleTest, "RESERVOIR_SIZE="+reservoirSize);
@@ -341,36 +341,36 @@ public class SamplingTests extends PigTests
assertOutput(test, "sampled", "("+reservoirSize+")");
}
}
-
+
/**
-
+
DEFINE ReservoirSample datafu.pig.sampling.ReservoirSample('$RESERVOIR_SIZE');
- DEFINE Assert datafu.pig.util.Assert();
-
+ DEFINE AssertUDF datafu.pig.util.AssertUDF();
+
data = LOAD 'input' AS (A_id:int, B_id:chararray, C:int);
sampled = FOREACH (GROUP data BY A_id) GENERATE group as A_id, ReservoirSample(data.(B_id,C)) as sample_data;
- sampled = FILTER sampled BY Assert((SIZE(sample_data) <= $RESERVOIR_SIZE ? 1 : 0), 'must be <= $RESERVOIR_SIZE');
+ sampled = FILTER sampled BY AssertUDF((SIZE(sample_data) <= $RESERVOIR_SIZE ? 1 : 0), 'must be <= $RESERVOIR_SIZE');
sampled = FOREACH sampled GENERATE A_id, FLATTEN(sample_data);
STORE sampled INTO 'output';
*/
@Multiline
private String reservoirSampleGroupTest;
-
+
/**
* Verifies that ReservoirSample works when data grouped by a key.
* In particular it ensures that the reservoir is not reused across keys.
- *
+ *
* <p>
* This confirms the fix for DATAFU-11.
* </p>
- *
+ *
* @throws Exception
*/
@Test
public void reservoirSampleGroupTest() throws Exception
- {
+ {
// first value is the key. last value matches the key so we can
// verify the register is reset for each key. values should not
// bleed across to other keys.
@@ -407,26 +407,26 @@ public class SamplingTests extends PigTests
"10\tB2\t10",
"10\tB6\t10",
"10\tB7\t10");
-
+
for(int i=1; i<=3; i++) {
int reservoirSize = i ;
PigTest test = createPigTestFromString(reservoirSampleGroupTest, "RESERVOIR_SIZE="+reservoirSize);
test.runScript();
-
+
List<Tuple> tuples = getLinesForAlias(test, "sampled");
-
+
for (Tuple tuple : tuples)
{
Assert.assertEquals(((Number)tuple.get(0)).intValue(), ((Number)tuple.get(2)).intValue());
}
}
}
-
+
@Test
public void reservoirSampleExecTest() throws IOException
{
ReservoirSample sampler = new ReservoirSample("10");
-
+
DataBag bag = BagFactory.getInstance().newDefaultBag();
for (int i=0; i<100; i++)
{
@@ -434,13 +434,13 @@ public class SamplingTests extends PigTests
t.set(0, i);
bag.add(t);
}
-
+
Tuple input = TupleFactory.getInstance().newTuple(bag);
-
+
DataBag result = sampler.exec(input);
-
+
Assert.assertEquals(10, result.size());
-
+
// all must be found, no repeats
Set<Integer> found = new HashSet<Integer>();
for (Tuple t : result)
@@ -452,12 +452,12 @@ public class SamplingTests extends PigTests
found.add(i);
}
}
-
+
@Test
public void reservoirSampleAccumulateTest() throws IOException
{
ReservoirSample sampler = new ReservoirSample("10");
-
+
for (int i=0; i<100; i++)
{
Tuple t = TupleFactory.getInstance().newTuple(1);
@@ -467,11 +467,11 @@ public class SamplingTests extends PigTests
Tuple input = TupleFactory.getInstance().newTuple(bag);
sampler.accumulate(input);
}
-
+
DataBag result = sampler.getValue();
-
+
Assert.assertEquals(10, result.size());
-
+
// all must be found, no repeats
Set<Integer> found = new HashSet<Integer>();
for (Tuple t : result)
@@ -483,14 +483,14 @@ public class SamplingTests extends PigTests
found.add(i);
}
}
-
+
@Test
public void reservoirSampleAlgebraicTest() throws IOException
{
ReservoirSample.Initial initialSampler = new ReservoirSample.Initial("10");
ReservoirSample.Intermediate intermediateSampler = new ReservoirSample.Intermediate("10");
ReservoirSample.Final finalSampler = new ReservoirSample.Final("10");
-
+
DataBag bag = BagFactory.getInstance().newDefaultBag();
for (int i=0; i<100; i++)
{
@@ -498,17 +498,17 @@ public class SamplingTests extends PigTests
t.set(0, i);
bag.add(t);
}
-
+
Tuple input = TupleFactory.getInstance().newTuple(bag);
-
- Tuple intermediateTuple = initialSampler.exec(input);
+
+ Tuple intermediateTuple = initialSampler.exec(input);
DataBag intermediateBag = BagFactory.getInstance().newDefaultBag(Arrays.asList(intermediateTuple));
- intermediateTuple = intermediateSampler.exec(TupleFactory.getInstance().newTuple(intermediateBag));
+ intermediateTuple = intermediateSampler.exec(TupleFactory.getInstance().newTuple(intermediateBag));
intermediateBag = BagFactory.getInstance().newDefaultBag(Arrays.asList(intermediateTuple));
DataBag result = finalSampler.exec(TupleFactory.getInstance().newTuple(intermediateBag));
-
+
Assert.assertEquals(10, result.size());
-
+
// all must be found, no repeats
Set<Integer> found = new HashSet<Integer>();
for (Tuple t : result)
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/412b14bc/datafu-pig/src/test/java/datafu/test/pig/sampling/WeightedReservoirSamplingTests.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/test/java/datafu/test/pig/sampling/WeightedReservoirSamplingTests.java b/datafu-pig/src/test/java/datafu/test/pig/sampling/WeightedReservoirSamplingTests.java
index 24e7ec7..7511324 100644
--- a/datafu-pig/src/test/java/datafu/test/pig/sampling/WeightedReservoirSamplingTests.java
+++ b/datafu-pig/src/test/java/datafu/test/pig/sampling/WeightedReservoirSamplingTests.java
@@ -39,40 +39,40 @@ import datafu.test.pig.PigTests;
/**
* Tests for {@link WeightedReservoirSample}.
*
- * @author wjian
+ * @author wjian
*
*/
public class WeightedReservoirSamplingTests extends PigTests
{
/**
-
+
DEFINE ReservoirSample datafu.pig.sampling.WeightedReservoirSample('$RESERVOIR_SIZE','2');
- DEFINE Assert datafu.pig.util.Assert();
-
+ DEFINE AssertUDF datafu.pig.util.AssertUDF();
+
data = LOAD 'input' AS (A_id:int, B_id:chararray, C:int, W:double);
sampled = FOREACH (GROUP data BY A_id) GENERATE group as A_id, ReservoirSample(data.(B_id,C,W)) as sample_data;
- sampled = FILTER sampled BY Assert((SIZE(sample_data) <= $RESERVOIR_SIZE ? 1 : 0), 'must be <= $RESERVOIR_SIZE');
+ sampled = FILTER sampled BY AssertUDF((SIZE(sample_data) <= $RESERVOIR_SIZE ? 1 : 0), 'must be <= $RESERVOIR_SIZE');
sampled = FOREACH sampled GENERATE A_id, FLATTEN(sample_data);
STORE sampled INTO 'output';
*/
@Multiline
private String weightedReservoirSampleGroupTest;
-
+
/**
* Verifies that WeightedReservoirSample works when data grouped by a key.
* In particular it ensures that the reservoir is not reused across keys.
- *
+ *
* <p>
* This confirms the fix for DATAFU-11.
* </p>
- *
+ *
* @throws Exception
*/
@Test
public void weightedReservoirSampleGroupTest() throws Exception
- {
+ {
// first value is the key. second to last value matches the key so we can
// verify the register is reset for each key. values should not
// bleed across to other keys.
@@ -109,39 +109,39 @@ public class WeightedReservoirSamplingTests extends PigTests
"10\tB2\t10\t1.0",
"10\tB6\t10\t1.0",
"10\tB7\t10\t1.0");
-
+
for(int i=1; i<=3; i++) {
int reservoirSize = i ;
PigTest test = createPigTestFromString(weightedReservoirSampleGroupTest, "RESERVOIR_SIZE="+reservoirSize);
test.runScript();
-
+
List<Tuple> tuples = getLinesForAlias(test, "sampled");
-
+
for (Tuple tuple : tuples)
{
Assert.assertEquals(((Number)tuple.get(0)).intValue(), ((Number)tuple.get(2)).intValue());
}
}
}
-
- /**
-
-
- define WeightedSample datafu.pig.sampling.WeightedReservoirSample('1','1');
+
+ /**
+
+
+ define WeightedSample datafu.pig.sampling.WeightedReservoirSample('1','1');
data = LOAD 'input' AS (v1:chararray,v2:INT);
data_g = group data all;
sampled = FOREACH data_g GENERATE WeightedSample(data);
- --describe sampled;
-
- STORE sampled INTO 'output';
-
- */
- @Multiline
- private String weightedSampleTest;
-
- @Test
- public void weightedSampleTest() throws Exception
+ --describe sampled;
+
+ STORE sampled INTO 'output';
+
+ */
+ @Multiline
+ private String weightedSampleTest;
+
+ @Test
+ public void weightedSampleTest() throws Exception
{
Map<String, Integer> count = new HashMap<String, Integer>();
@@ -150,9 +150,9 @@ public class WeightedReservoirSamplingTests extends PigTests
count.put("c", 0);
count.put("d", 0);
- PigTest test = createPigTestFromString(weightedSampleTest);
-
- writeLinesToFile("input",
+ PigTest test = createPigTestFromString(weightedSampleTest);
+
+ writeLinesToFile("input",
"a\t100","b\t1","c\t5","d\t2");
for(int i = 0; i < 10; i++) {
@@ -168,16 +168,16 @@ public class WeightedReservoirSamplingTests extends PigTests
Tuple st = sampleIter.next();
String key = (String)st.get(0);
count.put(key, count.get(key) + 1);
- }
+ }
}
String maxKey = "";
int maxCount = 0;
for(String key : count.keySet()) {
if(maxCount < count.get(key)) {
- maxKey = key;
+ maxKey = key;
maxCount = count.get(key);
- }
+ }
}
Assert.assertEquals(maxKey, "a");
@@ -200,7 +200,7 @@ public class WeightedReservoirSamplingTests extends PigTests
}
DataBag result = sampler.getValue();
- verifyNoRepeatAllFound(result, 10, 0, 100);
+ verifyNoRepeatAllFound(result, 10, 0, 100);
}
@Test
@@ -209,7 +209,7 @@ public class WeightedReservoirSamplingTests extends PigTests
WeightedReservoirSample.Initial initialSampler = new WeightedReservoirSample.Initial("10", "1");
WeightedReservoirSample.Intermediate intermediateSampler = new WeightedReservoirSample.Intermediate("10", "1");
WeightedReservoirSample.Final finalSampler = new WeightedReservoirSample.Final("10", "1");
-
+
DataBag bag = BagFactory.getInstance().newDefaultBag();
for (int i=0; i<100; i++)
{
@@ -218,15 +218,15 @@ public class WeightedReservoirSamplingTests extends PigTests
t.set(1, i + 1);
bag.add(t);
}
-
+
Tuple input = TupleFactory.getInstance().newTuple(bag);
-
- Tuple intermediateTuple = initialSampler.exec(input);
+
+ Tuple intermediateTuple = initialSampler.exec(input);
DataBag intermediateBag = BagFactory.getInstance().newDefaultBag(Arrays.asList(intermediateTuple));
- intermediateTuple = intermediateSampler.exec(TupleFactory.getInstance().newTuple(intermediateBag));
+ intermediateTuple = intermediateSampler.exec(TupleFactory.getInstance().newTuple(intermediateBag));
intermediateBag = BagFactory.getInstance().newDefaultBag(Arrays.asList(intermediateTuple));
DataBag result = finalSampler.exec(TupleFactory.getInstance().newTuple(intermediateBag));
- verifyNoRepeatAllFound(result, 10, 0, 100);
+ verifyNoRepeatAllFound(result, 10, 0, 100);
}
private void verifyNoRepeatAllFound(DataBag result,
@@ -235,7 +235,7 @@ public class WeightedReservoirSamplingTests extends PigTests
int right) throws ExecException
{
Assert.assertEquals(expectedResultSize, result.size());
-
+
// all must be found, no repeats
Set<Integer> found = new HashSet<Integer>();
for (Tuple t : result)
@@ -251,7 +251,7 @@ public class WeightedReservoirSamplingTests extends PigTests
public void weightedReservoirSampleLimitExecTest() throws IOException
{
WeightedReservoirSample sampler = new WeightedReservoirSample("100", "1");
-
+
DataBag bag = BagFactory.getInstance().newDefaultBag();
for (int i=0; i<10; i++)
{
@@ -260,13 +260,13 @@ public class WeightedReservoirSamplingTests extends PigTests
t.set(1, 1); // score is equal for all
bag.add(t);
}
-
+
Tuple input = TupleFactory.getInstance().newTuple(1);
input.set(0, bag);
-
+
DataBag result = sampler.exec(input);
-
- verifyNoRepeatAllFound(result, 10, 0, 10);
+
+ verifyNoRepeatAllFound(result, 10, 0, 10);
Set<Integer> found = new HashSet<Integer>();
for (Tuple t : result)
@@ -297,7 +297,7 @@ public class WeightedReservoirSamplingTests extends PigTests
{
PigTest test = createPigTestFromString(weightedSampleTest);
- writeLinesToFile("input",
+ writeLinesToFile("input",
"a\t100","b\t1","c\t0","d\t2");
try {
test.runScript();
@@ -307,28 +307,28 @@ public class WeightedReservoirSamplingTests extends PigTests
}
}
- /**
-
-
- define WeightedSample datafu.pig.sampling.WeightedReservoirSample('1','1');
+ /**
+
+
+ define WeightedSample datafu.pig.sampling.WeightedReservoirSample('1','1');
data = LOAD 'input' AS (v1:chararray);
data_g = group data all;
sampled = FOREACH data_g GENERATE WeightedSample(data);
- describe sampled;
-
- STORE sampled INTO 'output';
-
- */
- @Multiline
- private String invalidInputTupleSizeTest;
-
+ describe sampled;
+
+ STORE sampled INTO 'output';
+
+ */
+ @Multiline
+ private String invalidInputTupleSizeTest;
+
@Test
public void invalidInputTupleSizeTest() throws Exception
{
PigTest test = createPigTestFromString(invalidInputTupleSizeTest);
- writeLinesToFile("input",
+ writeLinesToFile("input",
"a","b","c","d");
try {
test.runScript();
@@ -339,28 +339,28 @@ public class WeightedReservoirSamplingTests extends PigTests
}
}
- /**
-
-
- define WeightedSample datafu.pig.sampling.WeightedReservoirSample('1','0');
+ /**
+
+
+ define WeightedSample datafu.pig.sampling.WeightedReservoirSample('1','0');
data = LOAD 'input' AS (v1:chararray, v2:INT);
data_g = group data all;
sampled = FOREACH data_g GENERATE WeightedSample(data);
- describe sampled;
-
- STORE sampled INTO 'output';
-
- */
- @Multiline
- private String invalidWeightFieldSchemaTest;
-
+ describe sampled;
+
+ STORE sampled INTO 'output';
+
+ */
+ @Multiline
+ private String invalidWeightFieldSchemaTest;
+
@Test
public void invalidWeightFieldSchemaTest() throws Exception
{
PigTest test = createPigTestFromString(invalidWeightFieldSchemaTest);
- writeLinesToFile("input",
+ writeLinesToFile("input",
"a\t100","b\t1","c\t5","d\t2");
try {
test.runScript();
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/412b14bc/gradle/dependency-versions.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
index 848074c..eb24e4a 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -11,7 +11,7 @@ ext {
log4jVersion="1.2.17"
mavenVersion="2.1.3"
jlineVersion="0.9.94"
- pigVersion="0.11.1"
+ pigVersion="0.12.1"
testngVersion="6.2"
toolsVersion="1.4.2"
wagonHttpVersion="1.0-beta-2"
[2/2] git commit: Revert "DATAFU-50 SimpleEvalFunc should extend
ContextualEvalFunc, have good lifecycle hooks"
Posted by mh...@apache.org.
Revert "DATAFU-50 SimpleEvalFunc should extend ContextualEvalFunc, have good lifecycle hooks"
This reverts commit a36cab397505837113a5193de52e6439bd12c831.
Multiple test failures.
Project: http://git-wip-us.apache.org/repos/asf/incubator-datafu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-datafu/commit/faab884c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-datafu/tree/faab884c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-datafu/diff/faab884c
Branch: refs/heads/master
Commit: faab884c278a3ab5197ec3cd8931f84a664f4904
Parents: c414915
Author: Matt Hayes <mh...@linkedin.com>
Authored: Sun May 25 17:56:44 2014 -0700
Committer: Matt Hayes <mh...@linkedin.com>
Committed: Sun May 25 18:23:10 2014 -0700
----------------------------------------------------------------------
.../java/datafu/pig/util/AliasableEvalFunc.java | 8 +--
.../datafu/pig/util/ContextualEvalFunc.java | 39 +-----------
.../java/datafu/pig/util/SimpleEvalFunc.java | 4 +-
.../datafu/test/pig/util/EvalFuncTests.java | 66 --------------------
.../datafu/test/pig/util/SchemaToString.java | 56 -----------------
5 files changed, 8 insertions(+), 165 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/faab884c/datafu-pig/src/main/java/datafu/pig/util/AliasableEvalFunc.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/util/AliasableEvalFunc.java b/datafu-pig/src/main/java/datafu/pig/util/AliasableEvalFunc.java
index 87bca33..ee2c3f3 100644
--- a/datafu-pig/src/main/java/datafu/pig/util/AliasableEvalFunc.java
+++ b/datafu-pig/src/main/java/datafu/pig/util/AliasableEvalFunc.java
@@ -90,12 +90,12 @@ public abstract class AliasableEvalFunc<T> extends ContextualEvalFunc<T>
}
/**
- * Hook to capture the input schema aliases
+ * A wrapper method which captures the schema and then calls getOutputSchema
*/
@Override
- public void onReady(Schema in_schema, Schema out_schema) {
- storeFieldAliases(in_schema);
- super.onReady(in_schema, out_schema);
+ public Schema outputSchema(Schema input) {
+ storeFieldAliases(input);
+ return getOutputSchema(input);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/faab884c/datafu-pig/src/main/java/datafu/pig/util/ContextualEvalFunc.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/util/ContextualEvalFunc.java b/datafu-pig/src/main/java/datafu/pig/util/ContextualEvalFunc.java
index e2b4392..c534b77 100644
--- a/datafu-pig/src/main/java/datafu/pig/util/ContextualEvalFunc.java
+++ b/datafu-pig/src/main/java/datafu/pig/util/ContextualEvalFunc.java
@@ -23,15 +23,12 @@ import java.util.Properties;
import org.apache.pig.EvalFunc;
import org.apache.pig.impl.util.UDFContext;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
/**
* An abstract class which enables UDFs to store instance properties
* on the front end which will be available on the back end.
- *
- * For example, you can override the onReady hook method to set properties
- * at front-end (i.e. at launch time) which will be available when exec() is
- * called (on the workers themselves.
+ * For example, properties may be set in the call to outputSchema(),
+ * which will be available when exec() is called.
*
* @param <T>
*/
@@ -83,36 +80,4 @@ public abstract class ContextualEvalFunc<T> extends EvalFunc<T>
private void setInstanceName(String instanceName) {
this.instanceName = instanceName;
}
-
- /**
- * Hook method, called once the input and output schema are prepared.
- *
- * Subclasses may override to set properties on the front end (i.e. at script
- * run time) that may be played with later (e.g. at execution time).
- *
- * Child classes must (a) call super.onReady(in_schema, out_schema) so
- * that the hook chains, and (b) not mess with the schema.
- *
- * @param in_schema input schema
- * @param out_schema output schema
- */
- protected void onReady(Schema in_schema, Schema out_schema)
- {
- /* ze goggles! zey do nussing! */
- }
-
- /**
- * Override outputSchema only to add the onReady hook method. In all
- * other respects delegates to the superclass outputSchema preparation.
- *
- * @param in_schema input schema
- * @return call to super.outputSchema
- */
- @Override
- public Schema outputSchema(Schema in_schema)
- {
- Schema out_schema = super.outputSchema(in_schema);
- onReady(in_schema, out_schema);
- return out_schema;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/faab884c/datafu-pig/src/main/java/datafu/pig/util/SimpleEvalFunc.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/util/SimpleEvalFunc.java b/datafu-pig/src/main/java/datafu/pig/util/SimpleEvalFunc.java
index 80bbce9..2d262b4 100644
--- a/datafu-pig/src/main/java/datafu/pig/util/SimpleEvalFunc.java
+++ b/datafu-pig/src/main/java/datafu/pig/util/SimpleEvalFunc.java
@@ -23,7 +23,7 @@ import java.io.IOException;
import java.lang.reflect.Method;
import java.lang.reflect.Type;
-import datafu.pig.util.ContextualEvalFunc;
+import org.apache.pig.EvalFunc;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.FrontendException;
@@ -98,7 +98,7 @@ import org.apache.pig.impl.logicalLayer.schema.Schema;
*/
-public abstract class SimpleEvalFunc<T> extends ContextualEvalFunc<T>
+public abstract class SimpleEvalFunc<T> extends EvalFunc<T>
{
// TODO Add support for other UDF types (e.g., FilterFunc)
// TODO Algebraic EvalFuncs
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/faab884c/datafu-pig/src/test/java/datafu/test/pig/util/EvalFuncTests.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/test/java/datafu/test/pig/util/EvalFuncTests.java b/datafu-pig/src/test/java/datafu/test/pig/util/EvalFuncTests.java
deleted file mode 100644
index b46dd1b..0000000
--- a/datafu-pig/src/test/java/datafu/test/pig/util/EvalFuncTests.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.test.pig.util;
-
-import org.adrianwalker.multilinestring.Multiline;
-import org.apache.pig.pigunit.PigTest;
-import org.testng.annotations.Test;
-import datafu.test.pig.PigTests;
-import junit.framework.Assert;
-
-import datafu.test.pig.util.SchemaToString;
-
-public class EvalFuncTests extends PigTests
-{
-
- /**
-
-
- define SchemaToString datafu.test.pig.util.SchemaToString();
-
- data = LOAD 'input' AS (city:chararray, state:chararray, pop:int);
-
- with_schema = FOREACH data {
- GENERATE city, state, pop, SchemaToString( (city, state, pop) );
- };
-
- STORE with_schema INTO 'output';
- */
- @Multiline private String onReadyTest;
-
- @Test
- public void onReadyTest() throws Exception
- {
- PigTest test = createPigTestFromString(onReadyTest);
-
- writeLinesToFile("input",
- "New York\tNY\t8244910",
- "Austin\tTX\t820611",
- "San Francisco\tCA\t812826");
-
- test.runScript();
-
- assertOutput(test, "with_schema",
- "(New York,NY,8244910,{(city: chararray,state: chararray,pop: int)})",
- "(Austin,TX,820611,{(city: chararray,state: chararray,pop: int)})",
- "(San Francisco,CA,812826,{(city: chararray,state: chararray,pop: int)})");
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/faab884c/datafu-pig/src/test/java/datafu/test/pig/util/SchemaToString.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/test/java/datafu/test/pig/util/SchemaToString.java b/datafu-pig/src/test/java/datafu/test/pig/util/SchemaToString.java
deleted file mode 100644
index e8714c4..0000000
--- a/datafu-pig/src/test/java/datafu/test/pig/util/SchemaToString.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.test.pig.util;
-
-import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-
-import datafu.pig.util.SimpleEvalFunc;
-
-/**
- * Simple UDF for testing purposes.
- *
- * Captures a value at front-end (launch) time, namely a string for the input
- * schema, and returns its value (ignoring the actual input it characterizes)
- */
-public class SchemaToString extends SimpleEvalFunc<String> {
- private String schema_string = null;
-
- /**
- * Hook to capture the input schema string that we will spit out later
- */
- @Override
- public void onReady(Schema in_schema, Schema out_schema) {
- String sch_str = in_schema.toString();
- sch_str = sch_str.replaceFirst("[\\w\\.]+: ", "");
- getInstanceProperties().put("schema_string", sch_str);
- super.onReady(in_schema, out_schema);
- }
-
- /*
- * @param a tuple
- * @return the schema of the tuple
- */
- public String call(Tuple tup)
- {
- if (schema_string == null) { schema_string = (String)getInstanceProperties().get("schema_string"); }
- return schema_string;
- }
-}