You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2016/11/29 10:10:18 UTC

[1/2] incubator-beam git commit: [BEAM-1052] Add InputDStream id to MicrobatchSource hashcode.

Repository: incubator-beam
Updated Branches:
  refs/heads/master 3f16f2660 -> 3a8b9b521


[BEAM-1052] Add InputDStream id to MicrobatchSource hashcode.

Done to avoid collisions between splits of different sources.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a1a4ac0f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a1a4ac0f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a1a4ac0f

Branch: refs/heads/master
Commit: a1a4ac0fc0376aa4c43a4357f3acc930e2b53c94
Parents: 3f16f26
Author: Aviem Zur <av...@gmail.com>
Authored: Tue Nov 29 09:51:12 2016 +0200
Committer: Sela <an...@paypal.com>
Committed: Tue Nov 29 11:49:31 2016 +0200

----------------------------------------------------------------------
 .../beam/runners/spark/io/MicrobatchSource.java | 20 ++++++++++++++------
 .../beam/runners/spark/io/SourceDStream.java    |  3 ++-
 .../spark/stateful/StateSpecFunctions.java      |  2 +-
 3 files changed, 17 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a1a4ac0f/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
index 4a174aa..5656375 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
@@ -54,6 +54,7 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo
   private final Duration maxReadTime;
   private final int numInitialSplits;
   private final long maxNumRecords;
+  private final int sourceId;
 
   // each split of the underlying UnboundedSource is associated with a (consistent) id
   // to match it's corresponding CheckpointMark state.
@@ -63,12 +64,14 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo
                    Duration maxReadTime,
                    int numInitialSplits,
                    long maxNumRecords,
-                   int splitId) {
+                   int splitId,
+                   int sourceId) {
     this.source = source;
     this.maxReadTime = maxReadTime;
     this.numInitialSplits = numInitialSplits;
     this.maxNumRecords = maxNumRecords;
     this.splitId = splitId;
+    this.sourceId = sourceId;
   }
 
   /**
@@ -98,7 +101,7 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo
     for (int i = 0; i < numSplits; i++) {
       // splits must be stable, and cannot change during consecutive executions
       // for example: Kafka should not add partitions if more then one topic is read.
-      result.add(new MicrobatchSource<>(splits.get(i), maxReadTime, 1, numRecords[i], i));
+      result.add(new MicrobatchSource<>(splits.get(i), maxReadTime, 1, numRecords[i], i, sourceId));
     }
     return result;
   }
@@ -137,8 +140,8 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo
     return source.getCheckpointMarkCoder();
   }
 
-  public int getSplitId() {
-    return splitId;
+  public String getId() {
+    return sourceId + "_" + splitId;
   }
 
   @Override
@@ -150,13 +153,18 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo
       return false;
     }
     MicrobatchSource<?, ?> that = (MicrobatchSource<?, ?>) o;
-
+    if (sourceId != that.sourceId) {
+      return false;
+    }
     return splitId == that.splitId;
+
   }
 
   @Override
   public int hashCode() {
-    return splitId;
+    int result = sourceId;
+    result = 31 * result + splitId;
+    return result;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a1a4ac0f/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
index 4e47757..84b247b 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
@@ -83,7 +83,8 @@ public class SourceDStream<T, CheckpointMarkT extends UnboundedSource.Checkpoint
   @Override
   public scala.Option<RDD<Tuple2<Source<T>, CheckpointMarkT>>> compute(Time validTime) {
     MicrobatchSource<T, CheckpointMarkT> microbatchSource = new MicrobatchSource<>(
-        unboundedSource, boundReadDuration, initialParallelism, rateControlledMaxRecords(), -1);
+        unboundedSource, boundReadDuration, initialParallelism, rateControlledMaxRecords(), -1,
+        id());
     RDD<scala.Tuple2<Source<T>, CheckpointMarkT>> rdd = new SourceRDD.Unbounded<>(
         ssc().sc(), runtimeContext, microbatchSource);
     return scala.Option.apply(rdd);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a1a4ac0f/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
index 48849c2..053f4ac 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
@@ -144,7 +144,7 @@ public class StateSpecFunctions {
 
           // close and checkpoint reader.
           reader.close();
-          LOG.info("Source id {} spent {} msec on reading.", microbatchSource.getSplitId(),
+          LOG.info("Source id {} spent {} msec on reading.", microbatchSource.getId(),
               stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
 
           // if the Source does not supply a CheckpointMark skip updating the state.


[2/2] incubator-beam git commit: This closes #1450

Posted by am...@apache.org.
This closes #1450


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3a8b9b52
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3a8b9b52
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3a8b9b52

Branch: refs/heads/master
Commit: 3a8b9b5212972f0128099251884473d06758e2aa
Parents: 3f16f26 a1a4ac0
Author: Sela <an...@paypal.com>
Authored: Tue Nov 29 11:50:51 2016 +0200
Committer: Sela <an...@paypal.com>
Committed: Tue Nov 29 11:50:51 2016 +0200

----------------------------------------------------------------------
 .../beam/runners/spark/io/MicrobatchSource.java | 20 ++++++++++++++------
 .../beam/runners/spark/io/SourceDStream.java    |  3 ++-
 .../spark/stateful/StateSpecFunctions.java      |  2 +-
 3 files changed, 17 insertions(+), 8 deletions(-)
----------------------------------------------------------------------