You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datafu.apache.org by mh...@apache.org on 2014/05/26 04:52:20 UTC

[1/2] git commit: DATAFU-10 Upgrade to Pig 0.12.1

Repository: incubator-datafu
Updated Branches:
  refs/heads/master c4149151e -> 412b14bc9


DATAFU-10 Upgrade to Pig 0.12.1

https://issues.apache.org/jira/browse/DATAFU-10


Project: http://git-wip-us.apache.org/repos/asf/incubator-datafu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-datafu/commit/412b14bc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-datafu/tree/412b14bc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-datafu/diff/412b14bc

Branch: refs/heads/master
Commit: 412b14bc9169a8f4f7433f02551cbbf698846f20
Parents: faab884
Author: Matt Hayes <mh...@linkedin.com>
Authored: Mon May 12 14:08:02 2014 -0700
Committer: Matt Hayes <mh...@linkedin.com>
Committed: Sun May 25 18:23:10 2014 -0700

----------------------------------------------------------------------
 .../datafu/test/pig/hash/lsh/LSHPigTest.java    |   2 +-
 .../datafu/test/pig/sampling/SamplingTests.java | 166 +++++++++----------
 .../WeightedReservoirSamplingTests.java         | 146 ++++++++--------
 gradle/dependency-versions.gradle               |   2 +-
 4 files changed, 158 insertions(+), 158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/412b14bc/datafu-pig/src/test/java/datafu/test/pig/hash/lsh/LSHPigTest.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/test/java/datafu/test/pig/hash/lsh/LSHPigTest.java b/datafu-pig/src/test/java/datafu/test/pig/hash/lsh/LSHPigTest.java
index a35c4f2..ac3e409 100644
--- a/datafu-pig/src/test/java/datafu/test/pig/hash/lsh/LSHPigTest.java
+++ b/datafu-pig/src/test/java/datafu/test/pig/hash/lsh/LSHPigTest.java
@@ -168,7 +168,7 @@ public class LSHPigTest extends PigTests
  describe NEAR_NEIGHBORS;
   NEIGHBORS_PROJ = foreach NEAR_NEIGHBORS {
    
-   generate query_pt as query_pt, neighbor.pt as matching_pts;
+   generate TOTUPLE(query_pt) as query_pt, neighbor.pt as matching_pts;
   };
   describe NEIGHBORS_PROJ;
   NOT_NULL = filter NEIGHBORS_PROJ by SIZE(matching_pts) > 0;

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/412b14bc/datafu-pig/src/test/java/datafu/test/pig/sampling/SamplingTests.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/test/java/datafu/test/pig/sampling/SamplingTests.java b/datafu-pig/src/test/java/datafu/test/pig/sampling/SamplingTests.java
index 2f685d4..418a694 100644
--- a/datafu-pig/src/test/java/datafu/test/pig/sampling/SamplingTests.java
+++ b/datafu-pig/src/test/java/datafu/test/pig/sampling/SamplingTests.java
@@ -47,69 +47,69 @@ import datafu.test.pig.PigTests;
 public class SamplingTests extends PigTests
 {
   /**
-  
+
 
   define WeightedSample datafu.pig.sampling.WeightedSample('1');
-  
+
   data = LOAD 'input' AS (A: bag {T: tuple(v1:chararray,v2:INT)});
-  
+
   data2 = FOREACH data GENERATE WeightedSample(A,1);
   --describe data2;
-  
+
   STORE data2 INTO 'output';
 
    */
   @Multiline
   private String weightedSampleTest;
-  
+
   @Test
   public void weightedSampleTest() throws Exception
   {
     PigTest test = createPigTestFromString(weightedSampleTest);
 
-    writeLinesToFile("input", 
+    writeLinesToFile("input",
                      "({(a, 100),(b, 1),(c, 5),(d, 2)})");
-                  
+
     test.runScript();
-            
+
     assertOutput(test, "data2",
         "({(a,100),(c,5),(b,1),(d,2)})");
   }
-  
+
   /**
-  
+
 
   define WeightedSample datafu.pig.sampling.WeightedSample('1');
-  
+
   data = LOAD 'input' AS (A: bag {T: tuple(v1:chararray,v2:INT)});
-  
+
   data2 = FOREACH data GENERATE WeightedSample(A,1,3);
   --describe data2;
-  
+
   STORE data2 INTO 'output';
    */
   @Multiline
   private String weightedSampleLimitTest;
-  
+
   @Test
   public void weightedSampleLimitTest() throws Exception
   {
     PigTest test = createPigTestFromString(weightedSampleLimitTest);
 
-    writeLinesToFile("input", 
+    writeLinesToFile("input",
                      "({(a, 100),(b, 1),(c, 5),(d, 2)})");
-                  
+
     test.runScript();
-            
+
     assertOutput(test, "data2",
         "({(a,100),(c,5),(b,1)})");
   }
-  
+
   @Test
   public void weightedSampleLimitExecTest() throws IOException
   {
     WeightedSample sampler = new WeightedSample();
-    
+
     DataBag bag = BagFactory.getInstance().newDefaultBag();
     for (int i=0; i<100; i++)
     {
@@ -118,16 +118,16 @@ public class SamplingTests extends PigTests
       t.set(1, 1); // score is equal for all
       bag.add(t);
     }
-    
+
     Tuple input = TupleFactory.getInstance().newTuple(3);
     input.set(0, bag);
     input.set(1, 1); // use index 1 for score
     input.set(2, 10); // get 10 items
-    
+
     DataBag result = sampler.exec(input);
-    
+
     Assert.assertEquals(10, result.size());
-    
+
     // all must be found, no repeats
     Set<Integer> found = new HashSet<Integer>();
     for (Tuple t : result)
@@ -139,26 +139,26 @@ public class SamplingTests extends PigTests
       found.add(i);
     }
   }
-  
+
   /**
-  
-  
+
+
   DEFINE SampleByKey datafu.pig.sampling.SampleByKey('0.5', 'salt2.5');
-  
+
   data = LOAD 'input' AS (A_id:chararray, B_id:chararray, C:int);
   sampled = FILTER data BY SampleByKey(A_id);
-  
+
   STORE sampled INTO 'output';
 
    */
   @Multiline
   private String sampleByKeyTest;
-  
+
   @Test
   public void sampleByKeyTest() throws Exception
   {
     PigTest test = createPigTestFromString(sampleByKeyTest);
-    
+
     writeLinesToFile("input",
                      "A1\tB1\t1","A1\tB1\t4","A1\tB3\t4","A1\tB4\t4",
                      "A2\tB1\t4","A2\tB2\t4",
@@ -170,9 +170,9 @@ public class SamplingTests extends PigTests
                      "A8\tB1\t4","A8\tB2\t45",
                      "A9\tB3\t92", "A9\tB1\t42","A9\tB2\t1","A9\tB3\t0",
                      "A10\tB1\t7","A10\tB2\t23","A10\tB3\t1","A10\tB4\t41","A10\tB5\t52");
-    
+
     test.runScript();
-    assertOutput(test, "sampled", 
+    assertOutput(test, "sampled",
                  "(A4,B1,3)","(A4,B2,3)","(A4,B3,59)","(A4,B4,29)",
                  "(A5,B1,4)",
                  "(A6,B2,3)","(A6,B2,55)","(A6,B3,1)",
@@ -181,24 +181,24 @@ public class SamplingTests extends PigTests
   }
 
   /**
-  
-  
+
+
   DEFINE SampleByKey datafu.pig.sampling.SampleByKey('0.5', 'salt2.5');
-  
+
   data = LOAD 'input' AS (A_id:chararray, B_id:chararray, C:int);
   sampled = FILTER data BY SampleByKey(A_id, B_id);
-  
+
   STORE sampled INTO 'output';
 
    */
   @Multiline
   private String sampleByKeyMultipleKeyTest;
-  
+
   @Test
   public void sampleByKeyMultipleKeyTest() throws Exception
   {
     PigTest test = createPigTestFromString(sampleByKeyMultipleKeyTest);
-    
+
     writeLinesToFile("input",
                      "A1\tB1\t1","A1\tB1\t4",
                      "A1\tB3\t4",
@@ -226,7 +226,7 @@ public class SamplingTests extends PigTests
                      "A10\tB6\t41",
                      "A10\tB7\t52");
     test.runScript();
-    assertOutput(test, "sampled", 
+    assertOutput(test, "sampled",
                  "(A1,B1,1)","(A1,B1,4)",
                  "(A1,B4,4)",
                  "(A2,B1,4)",
@@ -240,16 +240,16 @@ public class SamplingTests extends PigTests
                  "(A9,B3,92)","(A9,B3,0)",
                  "(A10,B2,23)","(A10,B2,1)","(A10,B2,31)"
                  );
-                   
+
   }
-  
+
   @Test
   public void sampleByKeyExecTest() throws Exception
   {
     SampleByKey sampler = new SampleByKey("0.10", "thesalt");
-    
+
     Map<Integer,Integer> valuesPerKey = new HashMap<Integer,Integer>();
-    
+
     // 10,000 keys total
     for (int i=0; i<10000; i++)
     {
@@ -259,7 +259,7 @@ public class SamplingTests extends PigTests
         Tuple t = TupleFactory.getInstance().newTuple(1);
         t.set(0, i);
         if (sampler.exec(t))
-        {          
+        {
           if (valuesPerKey.containsKey(i))
           {
             valuesPerKey.put(i, valuesPerKey.get(i)+1);
@@ -271,22 +271,22 @@ public class SamplingTests extends PigTests
         }
       }
     }
-    
+
     // 10% sample, so should have roughly 1000 keys
     Assert.assertTrue(Math.abs(1000-valuesPerKey.size()) < 50);
-    
+
     // every value should be present for the same key
     for (Map.Entry<Integer, Integer> pair : valuesPerKey.entrySet())
     {
       Assert.assertEquals(5, pair.getValue().intValue());
     }
   }
-  
+
   /**
-  
+
 
   DEFINE ReservoirSample datafu.pig.sampling.ReservoirSample('$RESERVOIR_SIZE');
-  
+
   data = LOAD 'input' AS (A_id:chararray, B_id:chararray, C:int);
   sampled = FOREACH (GROUP data ALL) GENERATE ReservoirSample(data) as sample_data;
   sampled = FOREACH sampled GENERATE COUNT(sample_data) AS sample_count;
@@ -295,11 +295,11 @@ public class SamplingTests extends PigTests
    */
   @Multiline
   private String reservoirSampleTest;
-  
+
   @Test
   public void reservoirSampleTest() throws Exception
   {
-    
+
     writeLinesToFile("input",
                      "A1\tB1\t1",
                      "A1\tB1\t4",
@@ -333,7 +333,7 @@ public class SamplingTests extends PigTests
                      "A10\tB2\t31",
                      "A10\tB6\t41",
                      "A10\tB7\t52");
-   
+
     for(int i=10; i<=30; i=i+10){
       int reservoirSize = i ;
       PigTest test = createPigTestFromString(reservoirSampleTest, "RESERVOIR_SIZE="+reservoirSize);
@@ -341,36 +341,36 @@ public class SamplingTests extends PigTests
       assertOutput(test, "sampled", "("+reservoirSize+")");
     }
   }
-  
+
   /**
-  
+
 
   DEFINE ReservoirSample datafu.pig.sampling.ReservoirSample('$RESERVOIR_SIZE');
-  DEFINE Assert datafu.pig.util.Assert();
-  
+  DEFINE AssertUDF datafu.pig.util.AssertUDF();
+
   data = LOAD 'input' AS (A_id:int, B_id:chararray, C:int);
   sampled = FOREACH (GROUP data BY A_id) GENERATE group as A_id, ReservoirSample(data.(B_id,C)) as sample_data;
-  sampled = FILTER sampled BY Assert((SIZE(sample_data) <= $RESERVOIR_SIZE ? 1 : 0), 'must be <= $RESERVOIR_SIZE');
+  sampled = FILTER sampled BY AssertUDF((SIZE(sample_data) <= $RESERVOIR_SIZE ? 1 : 0), 'must be <= $RESERVOIR_SIZE');
   sampled = FOREACH sampled GENERATE A_id, FLATTEN(sample_data);
   STORE sampled INTO 'output';
 
    */
   @Multiline
   private String reservoirSampleGroupTest;
-  
+
   /**
    * Verifies that ReservoirSample works when data grouped by a key.
    * In particular it ensures that the reservoir is not reused across keys.
-   * 
+   *
    * <p>
    * This confirms the fix for DATAFU-11.
    * </p>
-   * 
+   *
    * @throws Exception
    */
   @Test
   public void reservoirSampleGroupTest() throws Exception
-  {    
+  {
     // first value is the key.  last value matches the key so we can
     // verify the register is reset for each key.  values should not
     // bleed across to other keys.
@@ -407,26 +407,26 @@ public class SamplingTests extends PigTests
                      "10\tB2\t10",
                      "10\tB6\t10",
                      "10\tB7\t10");
-   
+
     for(int i=1; i<=3; i++) {
       int reservoirSize = i ;
       PigTest test = createPigTestFromString(reservoirSampleGroupTest, "RESERVOIR_SIZE="+reservoirSize);
       test.runScript();
-      
+
       List<Tuple> tuples = getLinesForAlias(test, "sampled");
-      
+
       for (Tuple tuple : tuples)
       {
         Assert.assertEquals(((Number)tuple.get(0)).intValue(), ((Number)tuple.get(2)).intValue());
       }
     }
   }
-  
+
   @Test
   public void reservoirSampleExecTest() throws IOException
   {
     ReservoirSample sampler = new ReservoirSample("10");
-    
+
     DataBag bag = BagFactory.getInstance().newDefaultBag();
     for (int i=0; i<100; i++)
     {
@@ -434,13 +434,13 @@ public class SamplingTests extends PigTests
       t.set(0, i);
       bag.add(t);
     }
-    
+
     Tuple input = TupleFactory.getInstance().newTuple(bag);
-    
+
     DataBag result = sampler.exec(input);
-    
+
     Assert.assertEquals(10, result.size());
-    
+
     // all must be found, no repeats
     Set<Integer> found = new HashSet<Integer>();
     for (Tuple t : result)
@@ -452,12 +452,12 @@ public class SamplingTests extends PigTests
       found.add(i);
     }
   }
-  
+
   @Test
   public void reservoirSampleAccumulateTest() throws IOException
   {
     ReservoirSample sampler = new ReservoirSample("10");
-    
+
     for (int i=0; i<100; i++)
     {
       Tuple t = TupleFactory.getInstance().newTuple(1);
@@ -467,11 +467,11 @@ public class SamplingTests extends PigTests
       Tuple input = TupleFactory.getInstance().newTuple(bag);
       sampler.accumulate(input);
     }
-        
+
     DataBag result = sampler.getValue();
-    
+
     Assert.assertEquals(10, result.size());
-    
+
     // all must be found, no repeats
     Set<Integer> found = new HashSet<Integer>();
     for (Tuple t : result)
@@ -483,14 +483,14 @@ public class SamplingTests extends PigTests
       found.add(i);
     }
   }
-  
+
   @Test
   public void reservoirSampleAlgebraicTest() throws IOException
   {
     ReservoirSample.Initial initialSampler = new ReservoirSample.Initial("10");
     ReservoirSample.Intermediate intermediateSampler = new ReservoirSample.Intermediate("10");
     ReservoirSample.Final finalSampler = new ReservoirSample.Final("10");
-    
+
     DataBag bag = BagFactory.getInstance().newDefaultBag();
     for (int i=0; i<100; i++)
     {
@@ -498,17 +498,17 @@ public class SamplingTests extends PigTests
       t.set(0, i);
       bag.add(t);
     }
-    
+
     Tuple input = TupleFactory.getInstance().newTuple(bag);
-    
-    Tuple intermediateTuple = initialSampler.exec(input);  
+
+    Tuple intermediateTuple = initialSampler.exec(input);
     DataBag intermediateBag = BagFactory.getInstance().newDefaultBag(Arrays.asList(intermediateTuple));
-    intermediateTuple = intermediateSampler.exec(TupleFactory.getInstance().newTuple(intermediateBag));  
+    intermediateTuple = intermediateSampler.exec(TupleFactory.getInstance().newTuple(intermediateBag));
     intermediateBag = BagFactory.getInstance().newDefaultBag(Arrays.asList(intermediateTuple));
     DataBag result = finalSampler.exec(TupleFactory.getInstance().newTuple(intermediateBag));
-    
+
     Assert.assertEquals(10, result.size());
-    
+
     // all must be found, no repeats
     Set<Integer> found = new HashSet<Integer>();
     for (Tuple t : result)

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/412b14bc/datafu-pig/src/test/java/datafu/test/pig/sampling/WeightedReservoirSamplingTests.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/test/java/datafu/test/pig/sampling/WeightedReservoirSamplingTests.java b/datafu-pig/src/test/java/datafu/test/pig/sampling/WeightedReservoirSamplingTests.java
index 24e7ec7..7511324 100644
--- a/datafu-pig/src/test/java/datafu/test/pig/sampling/WeightedReservoirSamplingTests.java
+++ b/datafu-pig/src/test/java/datafu/test/pig/sampling/WeightedReservoirSamplingTests.java
@@ -39,40 +39,40 @@ import datafu.test.pig.PigTests;
 /**
  * Tests for {@link WeightedReservoirSample}.
  *
- * @author wjian 
+ * @author wjian
  *
  */
 public class WeightedReservoirSamplingTests extends PigTests
 {
   /**
-  
+
 
   DEFINE ReservoirSample datafu.pig.sampling.WeightedReservoirSample('$RESERVOIR_SIZE','2');
-  DEFINE Assert datafu.pig.util.Assert();
-  
+  DEFINE AssertUDF datafu.pig.util.AssertUDF();
+
   data = LOAD 'input' AS (A_id:int, B_id:chararray, C:int, W:double);
   sampled = FOREACH (GROUP data BY A_id) GENERATE group as A_id, ReservoirSample(data.(B_id,C,W)) as sample_data;
-  sampled = FILTER sampled BY Assert((SIZE(sample_data) <= $RESERVOIR_SIZE ? 1 : 0), 'must be <= $RESERVOIR_SIZE');
+  sampled = FILTER sampled BY AssertUDF((SIZE(sample_data) <= $RESERVOIR_SIZE ? 1 : 0), 'must be <= $RESERVOIR_SIZE');
   sampled = FOREACH sampled GENERATE A_id, FLATTEN(sample_data);
   STORE sampled INTO 'output';
 
    */
   @Multiline
   private String weightedReservoirSampleGroupTest;
-  
+
   /**
    * Verifies that WeightedReservoirSample works when data grouped by a key.
    * In particular it ensures that the reservoir is not reused across keys.
-   * 
+   *
    * <p>
    * This confirms the fix for DATAFU-11.
    * </p>
-   * 
+   *
    * @throws Exception
    */
   @Test
   public void weightedReservoirSampleGroupTest() throws Exception
-  {    
+  {
     // first value is the key.  second to last value matches the key so we can
     // verify the register is reset for each key.  values should not
     // bleed across to other keys.
@@ -109,39 +109,39 @@ public class WeightedReservoirSamplingTests extends PigTests
                      "10\tB2\t10\t1.0",
                      "10\tB6\t10\t1.0",
                      "10\tB7\t10\t1.0");
-   
+
     for(int i=1; i<=3; i++) {
       int reservoirSize = i ;
       PigTest test = createPigTestFromString(weightedReservoirSampleGroupTest, "RESERVOIR_SIZE="+reservoirSize);
       test.runScript();
-      
+
       List<Tuple> tuples = getLinesForAlias(test, "sampled");
-      
+
       for (Tuple tuple : tuples)
       {
         Assert.assertEquals(((Number)tuple.get(0)).intValue(), ((Number)tuple.get(2)).intValue());
       }
     }
   }
-  
- /** 
-   
- 
-  define WeightedSample datafu.pig.sampling.WeightedReservoirSample('1','1'); 
+
+ /**
+
+
+  define WeightedSample datafu.pig.sampling.WeightedReservoirSample('1','1');
 
   data = LOAD 'input' AS (v1:chararray,v2:INT);
   data_g = group data all;
   sampled = FOREACH data_g GENERATE WeightedSample(data);
-  --describe sampled; 
-   
-  STORE sampled INTO 'output'; 
- 
-  */ 
-  @Multiline 
-  private String weightedSampleTest; 
-   
-  @Test 
-  public void weightedSampleTest() throws Exception 
+  --describe sampled;
+
+  STORE sampled INTO 'output';
+
+  */
+  @Multiline
+  private String weightedSampleTest;
+
+  @Test
+  public void weightedSampleTest() throws Exception
   {
      Map<String, Integer> count = new HashMap<String, Integer>();
 
@@ -150,9 +150,9 @@ public class WeightedReservoirSamplingTests extends PigTests
      count.put("c", 0);
      count.put("d", 0);
 
-     PigTest test = createPigTestFromString(weightedSampleTest); 
- 
-     writeLinesToFile("input",  
+     PigTest test = createPigTestFromString(weightedSampleTest);
+
+     writeLinesToFile("input",
                 "a\t100","b\t1","c\t5","d\t2");
 
      for(int i = 0; i < 10; i++) {
@@ -168,16 +168,16 @@ public class WeightedReservoirSamplingTests extends PigTests
            Tuple st = sampleIter.next();
            String key = (String)st.get(0);
            count.put(key, count.get(key) + 1);
-        }              
+        }
      }
 
      String maxKey = "";
      int maxCount = 0;
      for(String key : count.keySet()) {
         if(maxCount < count.get(key)) {
-           maxKey = key; 
+           maxKey = key;
            maxCount = count.get(key);
-        } 
+        }
      }
 
      Assert.assertEquals(maxKey, "a");
@@ -200,7 +200,7 @@ public class WeightedReservoirSamplingTests extends PigTests
      }
 
      DataBag result = sampler.getValue();
-     verifyNoRepeatAllFound(result, 10, 0, 100); 
+     verifyNoRepeatAllFound(result, 10, 0, 100);
   }
 
   @Test
@@ -209,7 +209,7 @@ public class WeightedReservoirSamplingTests extends PigTests
     WeightedReservoirSample.Initial initialSampler = new WeightedReservoirSample.Initial("10", "1");
     WeightedReservoirSample.Intermediate intermediateSampler = new WeightedReservoirSample.Intermediate("10", "1");
     WeightedReservoirSample.Final finalSampler = new WeightedReservoirSample.Final("10", "1");
-    
+
     DataBag bag = BagFactory.getInstance().newDefaultBag();
     for (int i=0; i<100; i++)
     {
@@ -218,15 +218,15 @@ public class WeightedReservoirSamplingTests extends PigTests
       t.set(1, i + 1);
       bag.add(t);
     }
-    
+
     Tuple input = TupleFactory.getInstance().newTuple(bag);
-    
-    Tuple intermediateTuple = initialSampler.exec(input);  
+
+    Tuple intermediateTuple = initialSampler.exec(input);
     DataBag intermediateBag = BagFactory.getInstance().newDefaultBag(Arrays.asList(intermediateTuple));
-    intermediateTuple = intermediateSampler.exec(TupleFactory.getInstance().newTuple(intermediateBag));  
+    intermediateTuple = intermediateSampler.exec(TupleFactory.getInstance().newTuple(intermediateBag));
     intermediateBag = BagFactory.getInstance().newDefaultBag(Arrays.asList(intermediateTuple));
     DataBag result = finalSampler.exec(TupleFactory.getInstance().newTuple(intermediateBag));
-    verifyNoRepeatAllFound(result, 10, 0, 100); 
+    verifyNoRepeatAllFound(result, 10, 0, 100);
    }
 
   private void verifyNoRepeatAllFound(DataBag result,
@@ -235,7 +235,7 @@ public class WeightedReservoirSamplingTests extends PigTests
                                       int right) throws ExecException
   {
     Assert.assertEquals(expectedResultSize, result.size());
-    
+
     // all must be found, no repeats
     Set<Integer> found = new HashSet<Integer>();
     for (Tuple t : result)
@@ -251,7 +251,7 @@ public class WeightedReservoirSamplingTests extends PigTests
   public void weightedReservoirSampleLimitExecTest() throws IOException
   {
     WeightedReservoirSample sampler = new WeightedReservoirSample("100", "1");
-   
+
     DataBag bag = BagFactory.getInstance().newDefaultBag();
     for (int i=0; i<10; i++)
     {
@@ -260,13 +260,13 @@ public class WeightedReservoirSamplingTests extends PigTests
       t.set(1, 1); // score is equal for all
       bag.add(t);
     }
-   
+
     Tuple input = TupleFactory.getInstance().newTuple(1);
     input.set(0, bag);
-   
+
     DataBag result = sampler.exec(input);
-   
-    verifyNoRepeatAllFound(result, 10, 0, 10); 
+
+    verifyNoRepeatAllFound(result, 10, 0, 10);
 
     Set<Integer> found = new HashSet<Integer>();
     for (Tuple t : result)
@@ -297,7 +297,7 @@ public class WeightedReservoirSamplingTests extends PigTests
   {
     PigTest test = createPigTestFromString(weightedSampleTest);
 
-    writeLinesToFile("input",  
+    writeLinesToFile("input",
                 "a\t100","b\t1","c\t0","d\t2");
     try {
          test.runScript();
@@ -307,28 +307,28 @@ public class WeightedReservoirSamplingTests extends PigTests
     }
   }
 
- /** 
-   
- 
-  define WeightedSample datafu.pig.sampling.WeightedReservoirSample('1','1'); 
+ /**
+
+
+  define WeightedSample datafu.pig.sampling.WeightedReservoirSample('1','1');
 
   data = LOAD 'input' AS (v1:chararray);
   data_g = group data all;
   sampled = FOREACH data_g GENERATE WeightedSample(data);
-  describe sampled; 
-   
-  STORE sampled INTO 'output'; 
- 
-  */ 
-  @Multiline 
-  private String invalidInputTupleSizeTest; 
- 
+  describe sampled;
+
+  STORE sampled INTO 'output';
+
+  */
+  @Multiline
+  private String invalidInputTupleSizeTest;
+
   @Test
   public void invalidInputTupleSizeTest() throws Exception
   {
     PigTest test = createPigTestFromString(invalidInputTupleSizeTest);
 
-    writeLinesToFile("input",  
+    writeLinesToFile("input",
                 "a","b","c","d");
     try {
          test.runScript();
@@ -339,28 +339,28 @@ public class WeightedReservoirSamplingTests extends PigTests
     }
   }
 
- /** 
-   
- 
-  define WeightedSample datafu.pig.sampling.WeightedReservoirSample('1','0'); 
+ /**
+
+
+  define WeightedSample datafu.pig.sampling.WeightedReservoirSample('1','0');
 
   data = LOAD 'input' AS (v1:chararray, v2:INT);
   data_g = group data all;
   sampled = FOREACH data_g GENERATE WeightedSample(data);
-  describe sampled; 
-   
-  STORE sampled INTO 'output'; 
- 
-  */ 
-  @Multiline 
-  private String invalidWeightFieldSchemaTest; 
- 
+  describe sampled;
+
+  STORE sampled INTO 'output';
+
+  */
+  @Multiline
+  private String invalidWeightFieldSchemaTest;
+
   @Test
   public void invalidWeightFieldSchemaTest() throws Exception
   {
     PigTest test = createPigTestFromString(invalidWeightFieldSchemaTest);
 
-    writeLinesToFile("input",  
+    writeLinesToFile("input",
                 "a\t100","b\t1","c\t5","d\t2");
     try {
          test.runScript();

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/412b14bc/gradle/dependency-versions.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
index 848074c..eb24e4a 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -11,7 +11,7 @@ ext {
   log4jVersion="1.2.17"
   mavenVersion="2.1.3"
   jlineVersion="0.9.94"
-  pigVersion="0.11.1"
+  pigVersion="0.12.1"
   testngVersion="6.2"
   toolsVersion="1.4.2"
   wagonHttpVersion="1.0-beta-2"


[2/2] git commit: Revert "DATAFU-50 SimpleEvalFunc should extend ContextualEvalFunc, have good lifecycle hooks"

Posted by mh...@apache.org.
Revert "DATAFU-50 SimpleEvalFunc should extend ContextualEvalFunc, have good lifecycle hooks"

This reverts commit a36cab397505837113a5193de52e6439bd12c831.

Multiple test failures.


Project: http://git-wip-us.apache.org/repos/asf/incubator-datafu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-datafu/commit/faab884c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-datafu/tree/faab884c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-datafu/diff/faab884c

Branch: refs/heads/master
Commit: faab884c278a3ab5197ec3cd8931f84a664f4904
Parents: c414915
Author: Matt Hayes <mh...@linkedin.com>
Authored: Sun May 25 17:56:44 2014 -0700
Committer: Matt Hayes <mh...@linkedin.com>
Committed: Sun May 25 18:23:10 2014 -0700

----------------------------------------------------------------------
 .../java/datafu/pig/util/AliasableEvalFunc.java |  8 +--
 .../datafu/pig/util/ContextualEvalFunc.java     | 39 +-----------
 .../java/datafu/pig/util/SimpleEvalFunc.java    |  4 +-
 .../datafu/test/pig/util/EvalFuncTests.java     | 66 --------------------
 .../datafu/test/pig/util/SchemaToString.java    | 56 -----------------
 5 files changed, 8 insertions(+), 165 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/faab884c/datafu-pig/src/main/java/datafu/pig/util/AliasableEvalFunc.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/util/AliasableEvalFunc.java b/datafu-pig/src/main/java/datafu/pig/util/AliasableEvalFunc.java
index 87bca33..ee2c3f3 100644
--- a/datafu-pig/src/main/java/datafu/pig/util/AliasableEvalFunc.java
+++ b/datafu-pig/src/main/java/datafu/pig/util/AliasableEvalFunc.java
@@ -90,12 +90,12 @@ public abstract class AliasableEvalFunc<T> extends ContextualEvalFunc<T>
   }
   
   /**
-   * Hook to capture the input schema aliases
+   * A wrapper method which captures the schema and then calls getOutputSchema
    */
   @Override
-  public void onReady(Schema in_schema, Schema out_schema) {
-    storeFieldAliases(in_schema);
-    super.onReady(in_schema, out_schema);
+  public Schema outputSchema(Schema input) {
+    storeFieldAliases(input);
+    return getOutputSchema(input);
   }
   
   /**

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/faab884c/datafu-pig/src/main/java/datafu/pig/util/ContextualEvalFunc.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/util/ContextualEvalFunc.java b/datafu-pig/src/main/java/datafu/pig/util/ContextualEvalFunc.java
index e2b4392..c534b77 100644
--- a/datafu-pig/src/main/java/datafu/pig/util/ContextualEvalFunc.java
+++ b/datafu-pig/src/main/java/datafu/pig/util/ContextualEvalFunc.java
@@ -23,15 +23,12 @@ import java.util.Properties;
 
 import org.apache.pig.EvalFunc;
 import org.apache.pig.impl.util.UDFContext;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
 
 /**
  * An abstract class which enables UDFs to store instance properties
  * on the front end which will be available on the back end.
- *
- * For example, you can override the onReady hook method to set properties
- * at front-end (i.e. at launch time) which will be available when exec() is
- * called (on the workers themselves.
+ * For example, properties may be set in the call to outputSchema(),
+ * which will be available when exec() is called.
  * 
  * @param <T>
  */
@@ -83,36 +80,4 @@ public abstract class ContextualEvalFunc<T> extends EvalFunc<T>
   private void setInstanceName(String instanceName) {
     this.instanceName = instanceName;
   }
-
-  /**
-   * Hook method, called once the input and output schema are prepared.
-   *
-   * Subclasses may override to set properties on the front end (i.e. at script
-   * run time) that may be played with later (e.g. at execution time).
-   *
-   * Child classes must (a) call super.onReady(in_schema, out_schema) so
-   * that the hook chains, and (b) not mess with the schema.
-   *
-   * @param in_schema input schema
-   * @param out_schema output schema
-   */
-  protected void onReady(Schema in_schema, Schema out_schema)
-  {
-    /* ze goggles! zey do nussing! */
-  }
-
-  /**
-   * Override outputSchema only to add the onReady hook method. In all
-   * other respects delegates to the superclass outputSchema preparation.
-   *
-   * @param in_schema input schema
-   * @return call to super.outputSchema
-   */
-  @Override
-  public Schema outputSchema(Schema in_schema)
-  {
-    Schema out_schema = super.outputSchema(in_schema);
-    onReady(in_schema, out_schema);
-    return out_schema;
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/faab884c/datafu-pig/src/main/java/datafu/pig/util/SimpleEvalFunc.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/util/SimpleEvalFunc.java b/datafu-pig/src/main/java/datafu/pig/util/SimpleEvalFunc.java
index 80bbce9..2d262b4 100644
--- a/datafu-pig/src/main/java/datafu/pig/util/SimpleEvalFunc.java
+++ b/datafu-pig/src/main/java/datafu/pig/util/SimpleEvalFunc.java
@@ -23,7 +23,7 @@ import java.io.IOException;
 import java.lang.reflect.Method;
 import java.lang.reflect.Type;
 
-import datafu.pig.util.ContextualEvalFunc;
+import org.apache.pig.EvalFunc;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.logicalLayer.FrontendException;
@@ -98,7 +98,7 @@ import org.apache.pig.impl.logicalLayer.schema.Schema;
 
 */
 
-public abstract class SimpleEvalFunc<T> extends ContextualEvalFunc<T>
+public abstract class SimpleEvalFunc<T> extends EvalFunc<T>
 {
   // TODO Add support for other UDF types (e.g., FilterFunc)
   // TODO Algebraic EvalFuncs 

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/faab884c/datafu-pig/src/test/java/datafu/test/pig/util/EvalFuncTests.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/test/java/datafu/test/pig/util/EvalFuncTests.java b/datafu-pig/src/test/java/datafu/test/pig/util/EvalFuncTests.java
deleted file mode 100644
index b46dd1b..0000000
--- a/datafu-pig/src/test/java/datafu/test/pig/util/EvalFuncTests.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.test.pig.util;
-
-import org.adrianwalker.multilinestring.Multiline;
-import org.apache.pig.pigunit.PigTest;
-import org.testng.annotations.Test;
-import datafu.test.pig.PigTests;
-import junit.framework.Assert;
-
-import datafu.test.pig.util.SchemaToString;
-
-public class EvalFuncTests extends PigTests
-{
-
-  /**
-
-
-  define SchemaToString datafu.test.pig.util.SchemaToString();
-
-  data = LOAD 'input' AS (city:chararray, state:chararray, pop:int);
-
-  with_schema = FOREACH data {
-    GENERATE city, state, pop, SchemaToString( (city, state, pop) );
-    };
-
-  STORE with_schema INTO 'output';
-   */
-  @Multiline private String onReadyTest;
-
-  @Test
-  public void onReadyTest() throws Exception
-  {
-    PigTest test = createPigTestFromString(onReadyTest);
-
-    writeLinesToFile("input",
-      "New York\tNY\t8244910",
-      "Austin\tTX\t820611",
-      "San Francisco\tCA\t812826");
-
-    test.runScript();
-
-    assertOutput(test, "with_schema",
-      "(New York,NY,8244910,{(city: chararray,state: chararray,pop: int)})",
-      "(Austin,TX,820611,{(city: chararray,state: chararray,pop: int)})",
-      "(San Francisco,CA,812826,{(city: chararray,state: chararray,pop: int)})");
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/faab884c/datafu-pig/src/test/java/datafu/test/pig/util/SchemaToString.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/test/java/datafu/test/pig/util/SchemaToString.java b/datafu-pig/src/test/java/datafu/test/pig/util/SchemaToString.java
deleted file mode 100644
index e8714c4..0000000
--- a/datafu-pig/src/test/java/datafu/test/pig/util/SchemaToString.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.test.pig.util;
-
-import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-
-import datafu.pig.util.SimpleEvalFunc;
-
-/**
- * Simple UDF for testing purposes.
- *
- * Captures a value at front-end (launch) time, namely a string for the input
- * schema, and returns its value (ignoring the actual input it characterizes)
- */
-public class SchemaToString extends SimpleEvalFunc<String> {
-  private String schema_string = null;
-
-  /**
-   * Hook to capture the input schema string that we will spit out later
-   */
-  @Override
-  public void onReady(Schema in_schema, Schema out_schema) {
-    String sch_str = in_schema.toString();
-    sch_str = sch_str.replaceFirst("[\\w\\.]+: ", "");
-    getInstanceProperties().put("schema_string", sch_str);
-    super.onReady(in_schema, out_schema);
-  }
-
-  /*
-   * @param  a tuple
-   * @return the schema of the tuple
-   */
-  public String call(Tuple tup)
-  {
-    if (schema_string == null) { schema_string = (String)getInstanceProperties().get("schema_string"); }
-    return schema_string;
-  }
-}