You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2018/03/27 04:50:27 UTC
[beam] 01/03: [BEAM-3087] Make reader state update and element
emission atomic
This is an automated email from the ASF dual-hosted git repository.
aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
commit fef8f54be417be88730ebf2d0e8db8c55fbd013f
Author: Grzegorz KoĊakowski <gr...@getindata.com>
AuthorDate: Wed Feb 21 10:31:53 2018 +0100
[BEAM-3087] Make reader state update and element emission atomic
Reader advancement should be considered as reader state update too.
Therefore, the reader's advancement and element emission are in the
same synchronized section.
---
.../streaming/io/UnboundedSourceWrapper.java | 67 ++++++++++++----------
1 file changed, 37 insertions(+), 30 deletions(-)
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index fc23c01..3f04b6c 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -186,7 +186,7 @@ public class UnboundedSourceWrapper<
if (isRestored) {
// restore the splitSources from the checkpoint to ensure consistent ordering
- for (KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT> restored:
+ for (KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT> restored :
stateForCheckpoint.get()) {
localSplitSources.add(restored.getKey());
localReaders.add(restored.getKey().createReader(
@@ -229,19 +229,25 @@ public class UnboundedSourceWrapper<
// the easy case, we just read from one reader
UnboundedSource.UnboundedReader<OutputT> reader = localReaders.get(0);
- boolean dataAvailable = readerInvoker.invokeStart(reader);
- if (dataAvailable) {
- emitElement(ctx, reader);
+ synchronized (ctx.getCheckpointLock()) {
+ boolean dataAvailable = readerInvoker.invokeStart(reader);
+ if (dataAvailable) {
+ emitElement(ctx, reader);
+ }
}
setNextWatermarkTimer(this.runtimeContext);
while (isRunning) {
- dataAvailable = readerInvoker.invokeAdvance(reader);
+ boolean dataAvailable;
+ synchronized (ctx.getCheckpointLock()) {
+ dataAvailable = readerInvoker.invokeAdvance(reader);
- if (dataAvailable) {
- emitElement(ctx, reader);
- } else {
+ if (dataAvailable) {
+ emitElement(ctx, reader);
+ }
+ }
+ if (!dataAvailable) {
Thread.sleep(50);
}
}
@@ -254,9 +260,11 @@ public class UnboundedSourceWrapper<
// start each reader and emit data if immediately available
for (UnboundedSource.UnboundedReader<OutputT> reader : localReaders) {
- boolean dataAvailable = readerInvoker.invokeStart(reader);
- if (dataAvailable) {
- emitElement(ctx, reader);
+ synchronized (ctx.getCheckpointLock()) {
+ boolean dataAvailable = readerInvoker.invokeStart(reader);
+ if (dataAvailable) {
+ emitElement(ctx, reader);
+ }
}
}
@@ -267,11 +275,13 @@ public class UnboundedSourceWrapper<
boolean hadData = false;
while (isRunning) {
UnboundedSource.UnboundedReader<OutputT> reader = localReaders.get(currentReader);
- boolean dataAvailable = readerInvoker.invokeAdvance(reader);
- if (dataAvailable) {
- emitElement(ctx, reader);
- hadData = true;
+ synchronized (ctx.getCheckpointLock()) {
+ boolean dataAvailable = readerInvoker.invokeAdvance(reader);
+ if (dataAvailable) {
+ emitElement(ctx, reader);
+ hadData = true;
+ }
}
currentReader = (currentReader + 1) % numReaders;
@@ -321,24 +331,21 @@ public class UnboundedSourceWrapper<
UnboundedSource.UnboundedReader<OutputT> reader) {
// make sure that reader state update and element emission are atomic
// with respect to snapshots
- synchronized (ctx.getCheckpointLock()) {
-
- OutputT item = reader.getCurrent();
- byte[] recordId = reader.getCurrentRecordId();
- Instant timestamp = reader.getCurrentTimestamp();
-
- WindowedValue<ValueWithRecordId<OutputT>> windowedValue =
- WindowedValue.of(new ValueWithRecordId<>(item, recordId), timestamp,
- GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
- ctx.collectWithTimestamp(windowedValue, timestamp.getMillis());
- }
+ OutputT item = reader.getCurrent();
+ byte[] recordId = reader.getCurrentRecordId();
+ Instant timestamp = reader.getCurrentTimestamp();
+
+ WindowedValue<ValueWithRecordId<OutputT>> windowedValue =
+ WindowedValue.of(new ValueWithRecordId<>(item, recordId), timestamp,
+ GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
+ ctx.collectWithTimestamp(windowedValue, timestamp.getMillis());
}
@Override
public void close() throws Exception {
super.close();
if (localReaders != null) {
- for (UnboundedSource.UnboundedReader<OutputT> reader: localReaders) {
+ for (UnboundedSource.UnboundedReader<OutputT> reader : localReaders) {
reader.close();
}
}
@@ -394,8 +401,8 @@ public class UnboundedSourceWrapper<
int diff = pendingCheckpoints.size() - MAX_NUMBER_PENDING_CHECKPOINTS;
if (diff >= 0) {
for (Iterator<Long> iterator = pendingCheckpoints.keySet().iterator();
- diff >= 0;
- diff--) {
+ diff >= 0;
+ diff--) {
iterator.next();
iterator.remove();
}
@@ -434,7 +441,7 @@ public class UnboundedSourceWrapper<
synchronized (context.getCheckpointLock()) {
// find minimum watermark over all localReaders
long watermarkMillis = Long.MAX_VALUE;
- for (UnboundedSource.UnboundedReader<OutputT> reader: localReaders) {
+ for (UnboundedSource.UnboundedReader<OutputT> reader : localReaders) {
Instant watermark = reader.getWatermark();
if (watermark != null) {
watermarkMillis = Math.min(watermark.getMillis(), watermarkMillis);
--
To stop receiving notification emails like this one, please contact
aljoscha@apache.org.