You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2017/04/18 23:05:28 UTC

spark git commit: [SPARK-20377][SS] Fix JavaStructuredSessionization example

Repository: spark
Updated Branches:
  refs/heads/master f654b39a6 -> 74aa0df8f


[SPARK-20377][SS] Fix JavaStructuredSessionization example

## What changes were proposed in this pull request?

Extra accessors in java bean class causes incorrect encoder generation, which corrupted the state when using timeouts.

## How was this patch tested?
manually ran the example

Author: Tathagata Das <ta...@gmail.com>

Closes #17676 from tdas/SPARK-20377.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/74aa0df8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/74aa0df8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/74aa0df8

Branch: refs/heads/master
Commit: 74aa0df8f7f132b62754e5159262e4a5b9b641ab
Parents: f654b39
Author: Tathagata Das <ta...@gmail.com>
Authored: Tue Apr 18 16:10:40 2017 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Tue Apr 18 16:10:40 2017 -0700

----------------------------------------------------------------------
 .../sql/streaming/JavaStructuredSessionization.java         | 9 ++++-----
 1 file changed, 4 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/74aa0df8/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java
index da3a5df..d3c8516 100644
--- a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java
+++ b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java
@@ -76,8 +76,6 @@ public final class JavaStructuredSessionization {
           for (String word : lineWithTimestamp.getLine().split(" ")) {
             eventList.add(new Event(word, lineWithTimestamp.getTimestamp()));
           }
-          System.out.println(
-              "Number of events from " + lineWithTimestamp.getLine() + " = " + eventList.size());
           return eventList.iterator();
         }
       };
@@ -100,7 +98,7 @@ public final class JavaStructuredSessionization {
           // If timed out, then remove session and send final update
           if (state.hasTimedOut()) {
             SessionUpdate finalUpdate = new SessionUpdate(
-                sessionId, state.get().getDurationMs(), state.get().getNumEvents(), true);
+                sessionId, state.get().calculateDuration(), state.get().getNumEvents(), true);
             state.remove();
             return finalUpdate;
 
@@ -133,7 +131,7 @@ public final class JavaStructuredSessionization {
             // Set timeout such that the session will be expired if no data received for 10 seconds
             state.setTimeoutDuration("10 seconds");
             return new SessionUpdate(
-                sessionId, state.get().getDurationMs(), state.get().getNumEvents(), false);
+                sessionId, state.get().calculateDuration(), state.get().getNumEvents(), false);
           }
         }
       };
@@ -215,7 +213,8 @@ public final class JavaStructuredSessionization {
     public long getEndTimestampMs() { return endTimestampMs; }
     public void setEndTimestampMs(long endTimestampMs) { this.endTimestampMs = endTimestampMs; }
 
-    public long getDurationMs() { return endTimestampMs - startTimestampMs; }
+    public long calculateDuration() { return endTimestampMs - startTimestampMs; }
+
     @Override public String toString() {
       return "SessionInfo(numEvents = " + numEvents +
           ", timestamps = " + startTimestampMs + " to " + endTimestampMs + ")";


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org