You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datafu.apache.org by ey...@apache.org on 2017/07/12 19:36:30 UTC

incubator-datafu git commit: DATAFU-124: Support millisecond duration in SessionCount and Sessionize

Repository: incubator-datafu
Updated Branches:
  refs/heads/master c53c1b971 -> 19837e6c2


DATAFU-124: Support millisecond duration in SessionCount and Sessionize

Signed-off-by: Eyal Allweil <ey...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-datafu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-datafu/commit/19837e6c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-datafu/tree/19837e6c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-datafu/diff/19837e6c

Branch: refs/heads/master
Commit: 19837e6c2e1542b09343b6e33a89f7e84ef0b9bb
Parents: c53c1b9
Author: Jacob Tolar <jt...@yahoo-inc.com>
Authored: Fri Jun 23 13:49:50 2017 -0500
Committer: Eyal Allweil <ey...@apache.org>
Committed: Wed Jul 12 22:35:35 2017 +0300

----------------------------------------------------------------------
 .../java/datafu/pig/sessions/SessionCount.java  |   2 +-
 .../java/datafu/pig/sessions/Sessionize.java    |   4 +-
 .../datafu/test/pig/sessions/SessionTests.java  | 310 ++++++++-----------
 3 files changed, 138 insertions(+), 178 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/19837e6c/datafu-pig/src/main/java/datafu/pig/sessions/SessionCount.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/sessions/SessionCount.java b/datafu-pig/src/main/java/datafu/pig/sessions/SessionCount.java
index fe68888..86acedd 100644
--- a/datafu-pig/src/main/java/datafu/pig/sessions/SessionCount.java
+++ b/datafu-pig/src/main/java/datafu/pig/sessions/SessionCount.java
@@ -69,7 +69,7 @@ public class SessionCount extends AccumulatorEvalFunc<Long>
   public SessionCount(String timeSpec)
   {
     Period p = new Period("PT" + timeSpec.toUpperCase());
-    this.millis = p.toStandardSeconds().getSeconds() * 1000;
+    this.millis = p.toStandardDuration().getMillis();
     cleanup();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/19837e6c/datafu-pig/src/main/java/datafu/pig/sessions/Sessionize.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/sessions/Sessionize.java b/datafu-pig/src/main/java/datafu/pig/sessions/Sessionize.java
index d81fb48..d4156b0 100644
--- a/datafu-pig/src/main/java/datafu/pig/sessions/Sessionize.java
+++ b/datafu-pig/src/main/java/datafu/pig/sessions/Sessionize.java
@@ -22,9 +22,7 @@ package datafu.pig.sessions;
 import java.io.IOException;
 import java.util.UUID;
 
-import org.apache.pig.Accumulator;
 import org.apache.pig.AccumulatorEvalFunc;
-import org.apache.pig.EvalFunc;
 import org.apache.pig.builtin.Nondeterministic;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
@@ -82,7 +80,7 @@ public class Sessionize extends AccumulatorEvalFunc<DataBag>
   public Sessionize(String timeSpec)
   {
     Period p = new Period("PT" + timeSpec.toUpperCase());
-    this.millis = p.toStandardSeconds().getSeconds() * 1000;
+    this.millis = p.toStandardDuration().getMillis();
 
     cleanup();
   }

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/19837e6c/datafu-pig/src/test/java/datafu/test/pig/sessions/SessionTests.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/test/java/datafu/test/pig/sessions/SessionTests.java b/datafu-pig/src/test/java/datafu/test/pig/sessions/SessionTests.java
index 76ec1d3..d2271eb 100644
--- a/datafu-pig/src/test/java/datafu/test/pig/sessions/SessionTests.java
+++ b/datafu-pig/src/test/java/datafu/test/pig/sessions/SessionTests.java
@@ -23,13 +23,10 @@ import static org.testng.Assert.*;
 
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.Date;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 
-import junit.framework.Assert;
-
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.commons.lang.StringUtils;
 import org.apache.pig.data.BagFactory;
@@ -48,7 +45,6 @@ public class SessionTests extends PigTests
 {
   /**
   
-
   define Sessionize datafu.pig.sessions.Sessionize('$TIME_WINDOW');
   
   views = LOAD 'input' AS (time:$TIME_TYPE, user_id:int, value:int);
@@ -86,7 +82,7 @@ public class SessionTests extends PigTests
       "2010-01-01T01:10:00Z\t3\t25",
       "2010-01-01T01:15:00Z\t3\t50",
       "2010-01-01T01:25:00Z\t3\t30",
-      "2010-01-01T01:30:00Z\t3\t15"  
+      "2010-01-01T01:30:00Z\t3\t15"
   };
   
   @Test
@@ -144,7 +140,7 @@ public class SessionTests extends PigTests
     for (String line : inputData)
     {
       String[] parts = line.split("\t");
-      Assert.assertEquals(3, parts.length);
+      assertEquals(3, parts.length);
       parts[0] = Long.toString(dateFormat.parse(parts[0]).getTime());
       lines.add(StringUtils.join(parts,"\t"));
     }
@@ -179,121 +175,84 @@ public class SessionTests extends PigTests
     assertTrue(userValues.get(2).containsKey(40));
     assertTrue(userValues.get(2).containsKey(50));
   }
-  
+
   @Test
-  public void sessionizeExecTest() throws Exception
+  public void sessionizeExecTestMatching() throws Exception
   {
-    Sessionize sessionize = new Sessionize("30m");
-    Tuple input = TupleFactory.getInstance().newTuple(1);
-    DataBag inputBag = BagFactory.getInstance().newDefaultBag();
-    input.set(0,inputBag);
-    
-    Tuple item;
-    List<Tuple> result;
-    DateTime dt;
-    
-    // test same session id
-    inputBag.clear();
-    dt = new DateTime();
-    item = TupleFactory.getInstance().newTuple(1);
-    item.set(0, dt.getMillis());
-    inputBag.add(item);
-    item = TupleFactory.getInstance().newTuple(1);
-    item.set(0, dt.plusMinutes(28).getMillis());
-    inputBag.add(item);
-    result = toList(sessionize.exec(input));
-    
-    Assert.assertEquals(2, result.size());
-    Assert.assertEquals(2,result.get(0).size());
-    Assert.assertEquals(2,result.get(1).size());
-    // session ids match
-    Assert.assertTrue(result.get(0).get(1).equals(result.get(1).get(1))); 
-    
-    // test different session id
-    inputBag.clear();
-    dt = new DateTime();
-    item = TupleFactory.getInstance().newTuple(1);
-    item.set(0, dt.getMillis());
-    inputBag.add(item);
-    item = TupleFactory.getInstance().newTuple(1);
-    item.set(0, dt.plusMinutes(31).getMillis());
-    inputBag.add(item);
-    result = toList(sessionize.exec(input));
-    
-    Assert.assertEquals(2, result.size());
-    Assert.assertEquals(2,result.get(0).size());
-    Assert.assertEquals(2,result.get(1).size());
-    // session ids don't match
-    Assert.assertFalse(result.get(0).get(1).equals(result.get(1).get(1)));
+    DateTime dt = new DateTime();
+
+    List<Tuple> result = runSessionizeExec("30m", dt, dt.plusMinutes(28));
+
+    assertEquals(2, result.size());
+    assertEquals(2,result.get(0).size());
+    assertEquals(2,result.get(1).size());
+    assertTrue(result.get(0).get(1).equals(result.get(1).get(1)));
   }
-  
+
+  @Test
+  public void sessionizeExecTestNonMatching() throws Exception
+  {
+    DateTime dt = new DateTime();
+
+    List<Tuple> result = runSessionizeExec("30m", dt, dt.plusMinutes(31));
+
+    assertEquals(2, result.size());
+    assertEquals(2,result.get(0).size());
+    assertEquals(2,result.get(1).size());
+    assertFalse(result.get(0).get(1).equals(result.get(1).get(1)));
+  }
+
+  @Test
+  public void sessionizeExecTestMs() throws Exception
+  {
+    DateTime dt = new DateTime();
+
+    List<Tuple> result = runSessionizeExec("0.450S", dt, dt.plusMillis(449));
+
+    assertEquals(2, result.size());
+    assertEquals(2,result.get(0).size());
+    assertEquals(2,result.get(1).size());
+    assertTrue(result.get(0).get(1).equals(result.get(1).get(1)));
+
+    result = runSessionizeExec("0.450S", dt, dt.plusMillis(451));
+    assertEquals(2, result.size());
+    assertEquals(2,result.get(0).size());
+    assertEquals(2,result.get(1).size());
+    assertFalse(result.get(0).get(1).equals(result.get(1).get(1)));
+  }
+
   @Test
   public void sessionizeAccumulateTest() throws Exception
   {
     Sessionize sessionize = new Sessionize("30m");
-    Tuple input = TupleFactory.getInstance().newTuple(1);
-    DataBag inputBag = BagFactory.getInstance().newDefaultBag();
-    input.set(0,inputBag);
-    
-    Tuple item;
-    List<Tuple> result;
-    DateTime dt;
-    
-    // test same session id
-    dt = new DateTime();
-    item = TupleFactory.getInstance().newTuple(1);
-    item.set(0, dt.getMillis());
-    inputBag.add(item);
-    sessionize.accumulate(input);
-    inputBag.clear();
-    item = TupleFactory.getInstance().newTuple(1);
-    item.set(0, dt.plusMinutes(28).getMillis());
-    inputBag.add(item);
-    sessionize.accumulate(input);
-    inputBag.clear();
-    result = toList(sessionize.getValue());
-    
-    Assert.assertEquals(2, result.size());
-    Assert.assertEquals(2,result.get(0).size());
-    Assert.assertEquals(2,result.get(1).size());
+    DateTime dt = new DateTime();
+
+    sessionize.accumulate(buildInputBag(dt, dt.plusMinutes(28)));
+    List<Tuple> result =  toList(sessionize.getValue());
+
+    assertEquals(2, result.size());
+    assertEquals(2,result.get(0).size());
+    assertEquals(2,result.get(1).size());
     // session ids match
-    Assert.assertTrue(result.get(0).get(1).equals(result.get(1).get(1))); 
-    
-    // test different session id
+    assertTrue(result.get(0).get(1).equals(result.get(1).get(1)));
+
     sessionize.cleanup();
+
+    // test with another bag; session ids shouldn't match
     dt = new DateTime();
-    item = TupleFactory.getInstance().newTuple(1);
-    item.set(0, dt.getMillis());
-    inputBag.add(item);
-    sessionize.accumulate(input);
-    inputBag.clear();
-    item = TupleFactory.getInstance().newTuple(1);
-    item.set(0, dt.plusMinutes(31).getMillis());
-    inputBag.add(item);
-    sessionize.accumulate(input);
-    inputBag.clear();
-    result = toList(sessionize.getValue());
-    
-    Assert.assertEquals(2, result.size());
-    Assert.assertEquals(2,result.get(0).size());
-    Assert.assertEquals(2,result.get(1).size());
-    // session ids don't match
-    Assert.assertFalse(result.get(0).get(1).equals(result.get(1).get(1)));
+    sessionize.accumulate(buildInputBag(dt, dt.plusMinutes(31)));
+    result =  toList(sessionize.getValue());
+
+    assertEquals(2, result.size());
+    assertEquals(2,result.get(0).size());
+    assertEquals(2,result.get(1).size());
+    assertFalse(result.get(0).get(1).equals(result.get(1).get(1)));
     
     sessionize.cleanup();
-    Assert.assertEquals(0,sessionize.getValue().size());
-  }
-  
-  private List<Tuple> toList(DataBag bag)
-  {
-    List<Tuple> result = new ArrayList<Tuple>();
-    for (Tuple t : bag)
-    {
-      result.add(t);
-    }
-    return result;
+    assertEquals(0, sessionize.getValue().size());
   }
   
+
   /**
   
 
@@ -350,81 +309,84 @@ public class SessionTests extends PigTests
   }
   
   @Test
-  public void sessionCountExecTest() throws Exception
+  public void sessionCountOneExecTest() throws Exception
   {
-    SessionCount sessionize = new SessionCount("30m");
-    Tuple input = TupleFactory.getInstance().newTuple(1);
-    DataBag inputBag = BagFactory.getInstance().newDefaultBag();
-    input.set(0,inputBag);
-    
-    Tuple item;
-    DateTime dt;
-    
-    // test same session id
-    inputBag.clear();
-    dt = new DateTime();
-    item = TupleFactory.getInstance().newTuple(1);
-    item.set(0, dt.getMillis());
-    inputBag.add(item);
-    item = TupleFactory.getInstance().newTuple(1);
-    item.set(0, dt.plusMinutes(28).getMillis());
-    inputBag.add(item);
-    Assert.assertEquals(1L,sessionize.exec(input).longValue()); 
-    
-    // test different session id
-    inputBag.clear();
-    dt = new DateTime();
-    item = TupleFactory.getInstance().newTuple(1);
-    item.set(0, dt.getMillis());
-    inputBag.add(item);
-    item = TupleFactory.getInstance().newTuple(1);
-    item.set(0, dt.plusMinutes(31).getMillis());
-    inputBag.add(item);
-    Assert.assertEquals(2L,sessionize.exec(input).longValue());
+    DateTime dt = new DateTime();
+    long count = runSessionCountExec("30m", dt, dt.plusMinutes(28));
+    assertEquals(1L, count);
   }
-  
+
+  @Test
+  public void sessionCountTwoExecTest() throws Exception
+  {
+    DateTime dt = new DateTime();
+    long count = runSessionCountExec("30m", dt, dt.plusMinutes(31));
+    assertEquals(2L, count);
+  }
+
+  @Test
+  public void sessionCountMsExecTest() throws Exception
+  {
+    DateTime dt = new DateTime();
+    long count = runSessionCountExec("0.450S", dt, dt.plusMillis(1));
+    assertEquals(1L, count);
+
+    count = runSessionCountExec("0.450S", dt, dt.plusMillis(451));
+    assertEquals(2L, count);
+  }
+
   @Test
   public void sessionCountAccumulateTest() throws Exception
   {
     SessionCount sessionize = new SessionCount("30m");
-    Tuple input = TupleFactory.getInstance().newTuple(1);
-    DataBag inputBag = BagFactory.getInstance().newDefaultBag();
-    input.set(0,inputBag);
-    
-    Tuple item;
-    DateTime dt;
-    
-    // test same session id
-    dt = new DateTime();
-    item = TupleFactory.getInstance().newTuple(1);
-    item.set(0, dt.getMillis());
-    inputBag.add(item);
-    sessionize.accumulate(input);
-    inputBag.clear();
-    item = TupleFactory.getInstance().newTuple(1);
-    item.set(0, dt.plusMinutes(28).getMillis());
-    inputBag.add(item);
-    sessionize.accumulate(input);
-    inputBag.clear();
-    Assert.assertEquals(1L,sessionize.getValue().longValue()); 
-    
-    // test different session id
+    DateTime dt = new DateTime();
+    sessionize.accumulate(buildInputBag(dt, dt.plusMinutes(28)));
+    assertEquals(1L, sessionize.getValue().longValue());
+
     sessionize.cleanup();
     dt = new DateTime();
-    item = TupleFactory.getInstance().newTuple(1);
-    item.set(0, dt.getMillis());
-    inputBag.add(item);
-    sessionize.accumulate(input);
-    inputBag.clear();
-    item = TupleFactory.getInstance().newTuple(1);
-    item.set(0, dt.plusMinutes(31).getMillis());
-    inputBag.add(item);
-    sessionize.accumulate(input);
-    inputBag.clear();
-    Assert.assertEquals(2L,sessionize.exec(input).longValue());
-    
+
+    sessionize.accumulate(buildInputBag(dt, dt.plusMinutes(31)));
+    assertEquals(2L, sessionize.getValue().longValue());
+
     sessionize.cleanup();
-    Assert.assertEquals(0,sessionize.getValue().longValue());
+    assertEquals(0, sessionize.getValue().longValue());
+  }
+
+  private static List<Tuple> toList(DataBag bag)
+  {
+    List<Tuple> result = new ArrayList<Tuple>();
+    for (Tuple t : bag)
+    {
+      result.add(t);
+    }
+    return result;
+  }
+
+  private static Tuple buildInputBag(DateTime ...dt) throws Exception
+  {
+    Tuple input = TupleFactory.getInstance().newTuple(1);
+    DataBag inputBag = BagFactory.getInstance().newDefaultBag();
+    input.set(0,inputBag);
+
+    for (DateTime time : dt)
+    {
+      inputBag.add(TupleFactory.getInstance().newTuple(Collections.singletonList(time.getMillis())));
+    }
+
+    return input;
+  }
+
+  private static List<Tuple> runSessionizeExec(String timespec, DateTime ...dt) throws Exception
+  {
+    Sessionize sessionize = new Sessionize(timespec);
+    return toList(sessionize.exec(buildInputBag(dt)));
+  }
+
+  private static Long runSessionCountExec(String timespec, DateTime ...dt) throws Exception
+  {
+    SessionCount sessionCount = new SessionCount(timespec);
+    return sessionCount.exec(buildInputBag(dt));
   }
 }