You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datafu.apache.org by ey...@apache.org on 2017/07/12 19:36:30 UTC
incubator-datafu git commit: DATAFU-124: Support millisecond duration
in SessionCount and Sessionize
Repository: incubator-datafu
Updated Branches:
refs/heads/master c53c1b971 -> 19837e6c2
DATAFU-124: Support millisecond duration in SessionCount and Sessionize
Signed-off-by: Eyal Allweil <ey...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-datafu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-datafu/commit/19837e6c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-datafu/tree/19837e6c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-datafu/diff/19837e6c
Branch: refs/heads/master
Commit: 19837e6c2e1542b09343b6e33a89f7e84ef0b9bb
Parents: c53c1b9
Author: Jacob Tolar <jt...@yahoo-inc.com>
Authored: Fri Jun 23 13:49:50 2017 -0500
Committer: Eyal Allweil <ey...@apache.org>
Committed: Wed Jul 12 22:35:35 2017 +0300
----------------------------------------------------------------------
.../java/datafu/pig/sessions/SessionCount.java | 2 +-
.../java/datafu/pig/sessions/Sessionize.java | 4 +-
.../datafu/test/pig/sessions/SessionTests.java | 310 ++++++++-----------
3 files changed, 138 insertions(+), 178 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/19837e6c/datafu-pig/src/main/java/datafu/pig/sessions/SessionCount.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/sessions/SessionCount.java b/datafu-pig/src/main/java/datafu/pig/sessions/SessionCount.java
index fe68888..86acedd 100644
--- a/datafu-pig/src/main/java/datafu/pig/sessions/SessionCount.java
+++ b/datafu-pig/src/main/java/datafu/pig/sessions/SessionCount.java
@@ -69,7 +69,7 @@ public class SessionCount extends AccumulatorEvalFunc<Long>
public SessionCount(String timeSpec)
{
Period p = new Period("PT" + timeSpec.toUpperCase());
- this.millis = p.toStandardSeconds().getSeconds() * 1000;
+ this.millis = p.toStandardDuration().getMillis();
cleanup();
}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/19837e6c/datafu-pig/src/main/java/datafu/pig/sessions/Sessionize.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/sessions/Sessionize.java b/datafu-pig/src/main/java/datafu/pig/sessions/Sessionize.java
index d81fb48..d4156b0 100644
--- a/datafu-pig/src/main/java/datafu/pig/sessions/Sessionize.java
+++ b/datafu-pig/src/main/java/datafu/pig/sessions/Sessionize.java
@@ -22,9 +22,7 @@ package datafu.pig.sessions;
import java.io.IOException;
import java.util.UUID;
-import org.apache.pig.Accumulator;
import org.apache.pig.AccumulatorEvalFunc;
-import org.apache.pig.EvalFunc;
import org.apache.pig.builtin.Nondeterministic;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
@@ -82,7 +80,7 @@ public class Sessionize extends AccumulatorEvalFunc<DataBag>
public Sessionize(String timeSpec)
{
Period p = new Period("PT" + timeSpec.toUpperCase());
- this.millis = p.toStandardSeconds().getSeconds() * 1000;
+ this.millis = p.toStandardDuration().getMillis();
cleanup();
}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/19837e6c/datafu-pig/src/test/java/datafu/test/pig/sessions/SessionTests.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/test/java/datafu/test/pig/sessions/SessionTests.java b/datafu-pig/src/test/java/datafu/test/pig/sessions/SessionTests.java
index 76ec1d3..d2271eb 100644
--- a/datafu-pig/src/test/java/datafu/test/pig/sessions/SessionTests.java
+++ b/datafu-pig/src/test/java/datafu/test/pig/sessions/SessionTests.java
@@ -23,13 +23,10 @@ import static org.testng.Assert.*;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.Date;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
-import junit.framework.Assert;
-
import org.adrianwalker.multilinestring.Multiline;
import org.apache.commons.lang.StringUtils;
import org.apache.pig.data.BagFactory;
@@ -48,7 +45,6 @@ public class SessionTests extends PigTests
{
/**
-
define Sessionize datafu.pig.sessions.Sessionize('$TIME_WINDOW');
views = LOAD 'input' AS (time:$TIME_TYPE, user_id:int, value:int);
@@ -86,7 +82,7 @@ public class SessionTests extends PigTests
"2010-01-01T01:10:00Z\t3\t25",
"2010-01-01T01:15:00Z\t3\t50",
"2010-01-01T01:25:00Z\t3\t30",
- "2010-01-01T01:30:00Z\t3\t15"
+ "2010-01-01T01:30:00Z\t3\t15"
};
@Test
@@ -144,7 +140,7 @@ public class SessionTests extends PigTests
for (String line : inputData)
{
String[] parts = line.split("\t");
- Assert.assertEquals(3, parts.length);
+ assertEquals(3, parts.length);
parts[0] = Long.toString(dateFormat.parse(parts[0]).getTime());
lines.add(StringUtils.join(parts,"\t"));
}
@@ -179,121 +175,84 @@ public class SessionTests extends PigTests
assertTrue(userValues.get(2).containsKey(40));
assertTrue(userValues.get(2).containsKey(50));
}
-
+
@Test
- public void sessionizeExecTest() throws Exception
+ public void sessionizeExecTestMatching() throws Exception
{
- Sessionize sessionize = new Sessionize("30m");
- Tuple input = TupleFactory.getInstance().newTuple(1);
- DataBag inputBag = BagFactory.getInstance().newDefaultBag();
- input.set(0,inputBag);
-
- Tuple item;
- List<Tuple> result;
- DateTime dt;
-
- // test same session id
- inputBag.clear();
- dt = new DateTime();
- item = TupleFactory.getInstance().newTuple(1);
- item.set(0, dt.getMillis());
- inputBag.add(item);
- item = TupleFactory.getInstance().newTuple(1);
- item.set(0, dt.plusMinutes(28).getMillis());
- inputBag.add(item);
- result = toList(sessionize.exec(input));
-
- Assert.assertEquals(2, result.size());
- Assert.assertEquals(2,result.get(0).size());
- Assert.assertEquals(2,result.get(1).size());
- // session ids match
- Assert.assertTrue(result.get(0).get(1).equals(result.get(1).get(1)));
-
- // test different session id
- inputBag.clear();
- dt = new DateTime();
- item = TupleFactory.getInstance().newTuple(1);
- item.set(0, dt.getMillis());
- inputBag.add(item);
- item = TupleFactory.getInstance().newTuple(1);
- item.set(0, dt.plusMinutes(31).getMillis());
- inputBag.add(item);
- result = toList(sessionize.exec(input));
-
- Assert.assertEquals(2, result.size());
- Assert.assertEquals(2,result.get(0).size());
- Assert.assertEquals(2,result.get(1).size());
- // session ids don't match
- Assert.assertFalse(result.get(0).get(1).equals(result.get(1).get(1)));
+ DateTime dt = new DateTime();
+
+ List<Tuple> result = runSessionizeExec("30m", dt, dt.plusMinutes(28));
+
+ assertEquals(2, result.size());
+ assertEquals(2,result.get(0).size());
+ assertEquals(2,result.get(1).size());
+ assertTrue(result.get(0).get(1).equals(result.get(1).get(1)));
}
-
+
+ @Test
+ public void sessionizeExecTestNonMatching() throws Exception
+ {
+ DateTime dt = new DateTime();
+
+ List<Tuple> result = runSessionizeExec("30m", dt, dt.plusMinutes(31));
+
+ assertEquals(2, result.size());
+ assertEquals(2,result.get(0).size());
+ assertEquals(2,result.get(1).size());
+ assertFalse(result.get(0).get(1).equals(result.get(1).get(1)));
+ }
+
+ @Test
+ public void sessionizeExecTestMs() throws Exception
+ {
+ DateTime dt = new DateTime();
+
+ List<Tuple> result = runSessionizeExec("0.450S", dt, dt.plusMillis(449));
+
+ assertEquals(2, result.size());
+ assertEquals(2,result.get(0).size());
+ assertEquals(2,result.get(1).size());
+ assertTrue(result.get(0).get(1).equals(result.get(1).get(1)));
+
+ result = runSessionizeExec("0.450S", dt, dt.plusMillis(451));
+ assertEquals(2, result.size());
+ assertEquals(2,result.get(0).size());
+ assertEquals(2,result.get(1).size());
+ assertFalse(result.get(0).get(1).equals(result.get(1).get(1)));
+ }
+
@Test
public void sessionizeAccumulateTest() throws Exception
{
Sessionize sessionize = new Sessionize("30m");
- Tuple input = TupleFactory.getInstance().newTuple(1);
- DataBag inputBag = BagFactory.getInstance().newDefaultBag();
- input.set(0,inputBag);
-
- Tuple item;
- List<Tuple> result;
- DateTime dt;
-
- // test same session id
- dt = new DateTime();
- item = TupleFactory.getInstance().newTuple(1);
- item.set(0, dt.getMillis());
- inputBag.add(item);
- sessionize.accumulate(input);
- inputBag.clear();
- item = TupleFactory.getInstance().newTuple(1);
- item.set(0, dt.plusMinutes(28).getMillis());
- inputBag.add(item);
- sessionize.accumulate(input);
- inputBag.clear();
- result = toList(sessionize.getValue());
-
- Assert.assertEquals(2, result.size());
- Assert.assertEquals(2,result.get(0).size());
- Assert.assertEquals(2,result.get(1).size());
+ DateTime dt = new DateTime();
+
+ sessionize.accumulate(buildInputBag(dt, dt.plusMinutes(28)));
+ List<Tuple> result = toList(sessionize.getValue());
+
+ assertEquals(2, result.size());
+ assertEquals(2,result.get(0).size());
+ assertEquals(2,result.get(1).size());
// session ids match
- Assert.assertTrue(result.get(0).get(1).equals(result.get(1).get(1)));
-
- // test different session id
+ assertTrue(result.get(0).get(1).equals(result.get(1).get(1)));
+
sessionize.cleanup();
+
+ // test with another bag; session ids shouldn't match
dt = new DateTime();
- item = TupleFactory.getInstance().newTuple(1);
- item.set(0, dt.getMillis());
- inputBag.add(item);
- sessionize.accumulate(input);
- inputBag.clear();
- item = TupleFactory.getInstance().newTuple(1);
- item.set(0, dt.plusMinutes(31).getMillis());
- inputBag.add(item);
- sessionize.accumulate(input);
- inputBag.clear();
- result = toList(sessionize.getValue());
-
- Assert.assertEquals(2, result.size());
- Assert.assertEquals(2,result.get(0).size());
- Assert.assertEquals(2,result.get(1).size());
- // session ids don't match
- Assert.assertFalse(result.get(0).get(1).equals(result.get(1).get(1)));
+ sessionize.accumulate(buildInputBag(dt, dt.plusMinutes(31)));
+ result = toList(sessionize.getValue());
+
+ assertEquals(2, result.size());
+ assertEquals(2,result.get(0).size());
+ assertEquals(2,result.get(1).size());
+ assertFalse(result.get(0).get(1).equals(result.get(1).get(1)));
sessionize.cleanup();
- Assert.assertEquals(0,sessionize.getValue().size());
- }
-
- private List<Tuple> toList(DataBag bag)
- {
- List<Tuple> result = new ArrayList<Tuple>();
- for (Tuple t : bag)
- {
- result.add(t);
- }
- return result;
+ assertEquals(0, sessionize.getValue().size());
}
+
/**
@@ -350,81 +309,84 @@ public class SessionTests extends PigTests
}
@Test
- public void sessionCountExecTest() throws Exception
+ public void sessionCountOneExecTest() throws Exception
{
- SessionCount sessionize = new SessionCount("30m");
- Tuple input = TupleFactory.getInstance().newTuple(1);
- DataBag inputBag = BagFactory.getInstance().newDefaultBag();
- input.set(0,inputBag);
-
- Tuple item;
- DateTime dt;
-
- // test same session id
- inputBag.clear();
- dt = new DateTime();
- item = TupleFactory.getInstance().newTuple(1);
- item.set(0, dt.getMillis());
- inputBag.add(item);
- item = TupleFactory.getInstance().newTuple(1);
- item.set(0, dt.plusMinutes(28).getMillis());
- inputBag.add(item);
- Assert.assertEquals(1L,sessionize.exec(input).longValue());
-
- // test different session id
- inputBag.clear();
- dt = new DateTime();
- item = TupleFactory.getInstance().newTuple(1);
- item.set(0, dt.getMillis());
- inputBag.add(item);
- item = TupleFactory.getInstance().newTuple(1);
- item.set(0, dt.plusMinutes(31).getMillis());
- inputBag.add(item);
- Assert.assertEquals(2L,sessionize.exec(input).longValue());
+ DateTime dt = new DateTime();
+ long count = runSessionCountExec("30m", dt, dt.plusMinutes(28));
+ assertEquals(1L, count);
}
-
+
+ @Test
+ public void sessionCountTwoExecTest() throws Exception
+ {
+ DateTime dt = new DateTime();
+ long count = runSessionCountExec("30m", dt, dt.plusMinutes(31));
+ assertEquals(2L, count);
+ }
+
+ @Test
+ public void sessionCountMsExecTest() throws Exception
+ {
+ DateTime dt = new DateTime();
+ long count = runSessionCountExec("0.450S", dt, dt.plusMillis(1));
+ assertEquals(1L, count);
+
+ count = runSessionCountExec("0.450S", dt, dt.plusMillis(451));
+ assertEquals(2L, count);
+ }
+
@Test
public void sessionCountAccumulateTest() throws Exception
{
SessionCount sessionize = new SessionCount("30m");
- Tuple input = TupleFactory.getInstance().newTuple(1);
- DataBag inputBag = BagFactory.getInstance().newDefaultBag();
- input.set(0,inputBag);
-
- Tuple item;
- DateTime dt;
-
- // test same session id
- dt = new DateTime();
- item = TupleFactory.getInstance().newTuple(1);
- item.set(0, dt.getMillis());
- inputBag.add(item);
- sessionize.accumulate(input);
- inputBag.clear();
- item = TupleFactory.getInstance().newTuple(1);
- item.set(0, dt.plusMinutes(28).getMillis());
- inputBag.add(item);
- sessionize.accumulate(input);
- inputBag.clear();
- Assert.assertEquals(1L,sessionize.getValue().longValue());
-
- // test different session id
+ DateTime dt = new DateTime();
+ sessionize.accumulate(buildInputBag(dt, dt.plusMinutes(28)));
+ assertEquals(1L, sessionize.getValue().longValue());
+
sessionize.cleanup();
dt = new DateTime();
- item = TupleFactory.getInstance().newTuple(1);
- item.set(0, dt.getMillis());
- inputBag.add(item);
- sessionize.accumulate(input);
- inputBag.clear();
- item = TupleFactory.getInstance().newTuple(1);
- item.set(0, dt.plusMinutes(31).getMillis());
- inputBag.add(item);
- sessionize.accumulate(input);
- inputBag.clear();
- Assert.assertEquals(2L,sessionize.exec(input).longValue());
-
+
+ sessionize.accumulate(buildInputBag(dt, dt.plusMinutes(31)));
+ assertEquals(2L, sessionize.getValue().longValue());
+
sessionize.cleanup();
- Assert.assertEquals(0,sessionize.getValue().longValue());
+ assertEquals(0, sessionize.getValue().longValue());
+ }
+
+ private static List<Tuple> toList(DataBag bag)
+ {
+ List<Tuple> result = new ArrayList<Tuple>();
+ for (Tuple t : bag)
+ {
+ result.add(t);
+ }
+ return result;
+ }
+
+ private static Tuple buildInputBag(DateTime ...dt) throws Exception
+ {
+ Tuple input = TupleFactory.getInstance().newTuple(1);
+ DataBag inputBag = BagFactory.getInstance().newDefaultBag();
+ input.set(0,inputBag);
+
+ for (DateTime time : dt)
+ {
+ inputBag.add(TupleFactory.getInstance().newTuple(Collections.singletonList(time.getMillis())));
+ }
+
+ return input;
+ }
+
+ private static List<Tuple> runSessionizeExec(String timespec, DateTime ...dt) throws Exception
+ {
+ Sessionize sessionize = new Sessionize(timespec);
+ return toList(sessionize.exec(buildInputBag(dt)));
+ }
+
+ private static Long runSessionCountExec(String timespec, DateTime ...dt) throws Exception
+ {
+ SessionCount sessionCount = new SessionCount(timespec);
+ return sessionCount.exec(buildInputBag(dt));
}
}