You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2016/11/29 10:10:18 UTC
[1/2] incubator-beam git commit: [BEAM-1052] Add InputDStream id to
MicrobatchSource hashcode.
Repository: incubator-beam
Updated Branches:
refs/heads/master 3f16f2660 -> 3a8b9b521
[BEAM-1052] Add InputDStream id to MicrobatchSource hashcode.
Done to avoid collisions between splits of different sources.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a1a4ac0f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a1a4ac0f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a1a4ac0f
Branch: refs/heads/master
Commit: a1a4ac0fc0376aa4c43a4357f3acc930e2b53c94
Parents: 3f16f26
Author: Aviem Zur <av...@gmail.com>
Authored: Tue Nov 29 09:51:12 2016 +0200
Committer: Sela <an...@paypal.com>
Committed: Tue Nov 29 11:49:31 2016 +0200
----------------------------------------------------------------------
.../beam/runners/spark/io/MicrobatchSource.java | 20 ++++++++++++++------
.../beam/runners/spark/io/SourceDStream.java | 3 ++-
.../spark/stateful/StateSpecFunctions.java | 2 +-
3 files changed, 17 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a1a4ac0f/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
index 4a174aa..5656375 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
@@ -54,6 +54,7 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo
private final Duration maxReadTime;
private final int numInitialSplits;
private final long maxNumRecords;
+ private final int sourceId;
// each split of the underlying UnboundedSource is associated with a (consistent) id
// to match it's corresponding CheckpointMark state.
@@ -63,12 +64,14 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo
Duration maxReadTime,
int numInitialSplits,
long maxNumRecords,
- int splitId) {
+ int splitId,
+ int sourceId) {
this.source = source;
this.maxReadTime = maxReadTime;
this.numInitialSplits = numInitialSplits;
this.maxNumRecords = maxNumRecords;
this.splitId = splitId;
+ this.sourceId = sourceId;
}
/**
@@ -98,7 +101,7 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo
for (int i = 0; i < numSplits; i++) {
// splits must be stable, and cannot change during consecutive executions
// for example: Kafka should not add partitions if more then one topic is read.
- result.add(new MicrobatchSource<>(splits.get(i), maxReadTime, 1, numRecords[i], i));
+ result.add(new MicrobatchSource<>(splits.get(i), maxReadTime, 1, numRecords[i], i, sourceId));
}
return result;
}
@@ -137,8 +140,8 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo
return source.getCheckpointMarkCoder();
}
- public int getSplitId() {
- return splitId;
+ public String getId() {
+ return sourceId + "_" + splitId;
}
@Override
@@ -150,13 +153,18 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo
return false;
}
MicrobatchSource<?, ?> that = (MicrobatchSource<?, ?>) o;
-
+ if (sourceId != that.sourceId) {
+ return false;
+ }
return splitId == that.splitId;
+
}
@Override
public int hashCode() {
- return splitId;
+ int result = sourceId;
+ result = 31 * result + splitId;
+ return result;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a1a4ac0f/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
index 4e47757..84b247b 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
@@ -83,7 +83,8 @@ public class SourceDStream<T, CheckpointMarkT extends UnboundedSource.Checkpoint
@Override
public scala.Option<RDD<Tuple2<Source<T>, CheckpointMarkT>>> compute(Time validTime) {
MicrobatchSource<T, CheckpointMarkT> microbatchSource = new MicrobatchSource<>(
- unboundedSource, boundReadDuration, initialParallelism, rateControlledMaxRecords(), -1);
+ unboundedSource, boundReadDuration, initialParallelism, rateControlledMaxRecords(), -1,
+ id());
RDD<scala.Tuple2<Source<T>, CheckpointMarkT>> rdd = new SourceRDD.Unbounded<>(
ssc().sc(), runtimeContext, microbatchSource);
return scala.Option.apply(rdd);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a1a4ac0f/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
index 48849c2..053f4ac 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
@@ -144,7 +144,7 @@ public class StateSpecFunctions {
// close and checkpoint reader.
reader.close();
- LOG.info("Source id {} spent {} msec on reading.", microbatchSource.getSplitId(),
+ LOG.info("Source id {} spent {} msec on reading.", microbatchSource.getId(),
stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
// if the Source does not supply a CheckpointMark skip updating the state.
[2/2] incubator-beam git commit: This closes #1450
Posted by am...@apache.org.
This closes #1450
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3a8b9b52
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3a8b9b52
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3a8b9b52
Branch: refs/heads/master
Commit: 3a8b9b5212972f0128099251884473d06758e2aa
Parents: 3f16f26 a1a4ac0
Author: Sela <an...@paypal.com>
Authored: Tue Nov 29 11:50:51 2016 +0200
Committer: Sela <an...@paypal.com>
Committed: Tue Nov 29 11:50:51 2016 +0200
----------------------------------------------------------------------
.../beam/runners/spark/io/MicrobatchSource.java | 20 ++++++++++++++------
.../beam/runners/spark/io/SourceDStream.java | 3 ++-
.../spark/stateful/StateSpecFunctions.java | 2 +-
3 files changed, 17 insertions(+), 8 deletions(-)
----------------------------------------------------------------------