You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by st...@apache.org on 2017/05/09 07:10:03 UTC
[1/2] beam git commit: [BEAM-2095] Made SourceRDD hasNext idempotent
Repository: beam
Updated Branches:
refs/heads/master d96fd173c -> e10fbdaa2
[BEAM-2095] Made SourceRDD hasNext idempotent
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/81b89acb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/81b89acb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/81b89acb
Branch: refs/heads/master
Commit: 81b89acb46d79dabc9c21d8421868c79a609a9dc
Parents: d96fd17
Author: Stas Levin <st...@apache.org>
Authored: Mon May 1 07:30:49 2017 +0300
Committer: Stas Levin <st...@apache.org>
Committed: Tue May 9 10:09:31 2017 +0300
----------------------------------------------------------------------
.../apache/beam/runners/spark/io/SourceRDD.java | 173 ++++++++++++-------
.../spark/io/ReaderToIteratorAdapterTest.java | 145 ++++++++++++++++
2 files changed, 260 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/81b89acb/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
index 71a19e7..aa89c59 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
@@ -19,12 +19,15 @@
package org.apache.beam.runners.spark.io;
import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import com.google.common.annotations.VisibleForTesting;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.NoSuchElementException;
import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
import org.apache.beam.sdk.io.BoundedSource;
@@ -47,6 +50,7 @@ import org.apache.spark.rdd.RDD;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
+import scala.collection.JavaConversions;
/**
* Classes implementing Beam {@link Source} {@link RDD}s.
@@ -118,80 +122,133 @@ public class SourceRDD {
}
}
+ private BoundedSource.BoundedReader<T> createReader(SourcePartition<T> partition) {
+ try {
+ return ((BoundedSource<T>) partition.source).createReader(
+ runtimeContext.getPipelineOptions());
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to create reader from a BoundedSource.", e);
+ }
+ }
+
@Override
public scala.collection.Iterator<WindowedValue<T>> compute(final Partition split,
- TaskContext context) {
+ final TaskContext context) {
final MetricsContainer metricsContainer = metricsAccum.localValue().getContainer(stepName);
- final Iterator<WindowedValue<T>> iter = new Iterator<WindowedValue<T>>() {
- @SuppressWarnings("unchecked")
- SourcePartition<T> partition = (SourcePartition<T>) split;
- BoundedSource.BoundedReader<T> reader = createReader(partition);
-
- private boolean finished = false;
- private boolean started = false;
- private boolean closed = false;
-
- @Override
- public boolean hasNext() {
- // Add metrics container to the scope of org.apache.beam.sdk.io.Source.Reader methods
- // since they may report metrics.
- try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(metricsContainer)) {
- try {
- if (!started) {
- started = true;
- finished = !reader.start();
- } else {
- finished = !reader.advance();
- }
- if (finished) {
- // safely close the reader if there are no more elements left to read.
- closeIfNotClosed();
- }
- return !finished;
- } catch (IOException e) {
- closeIfNotClosed();
- throw new RuntimeException("Failed to read from reader.", e);
+ @SuppressWarnings("unchecked")
+ final BoundedSource.BoundedReader<T> reader = createReader((SourcePartition<T>) split);
+
+ final Iterator<WindowedValue<T>> readerIterator =
+ new ReaderToIteratorAdapter<>(metricsContainer, reader);
+
+ return new InterruptibleIterator<>(context, JavaConversions.asScalaIterator(readerIterator));
+ }
+
+ /**
+ * Exposes an <code>Iterator</code><{@link WindowedValue}> interface on top of a
+ * {@link Source.Reader}.
+ * <p>
+ * <code>hasNext</code> is idempotent and returns <code>true</code> iff further items are
+ * available for reading using the underlying reader.
+ * Consequently, when the reader is closed, or when the reader has no further elements
+ * available (i.e, {@link Source.Reader#advance()} returned <code>false</code>),
+ * <code>hasNext</code> returns <code>false</code>.
+ * </p>
+ * <p>
+ * Since this is a read-only iterator, an attempt to call <code>remove</code> will throw an
+ * <code>UnsupportedOperationException</code>.
+ * </p>
+ */
+ @VisibleForTesting
+ static class ReaderToIteratorAdapter<T> implements Iterator<WindowedValue<T>> {
+
+ private static final boolean FAILED_TO_OBTAIN_NEXT = false;
+ private static final boolean SUCCESSFULLY_OBTAINED_NEXT = true;
+
+ private final MetricsContainer metricsContainer;
+ private final Source.Reader<T> reader;
+
+ private boolean started = false;
+ private boolean closed = false;
+ private WindowedValue<T> next = null;
+
+ ReaderToIteratorAdapter(final MetricsContainer metricsContainer,
+ final Source.Reader<T> reader) {
+ this.metricsContainer = metricsContainer;
+ this.reader = reader;
+ }
+
+ private boolean tryProduceNext() {
+ try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(metricsContainer)) {
+ if (closed) {
+ return FAILED_TO_OBTAIN_NEXT;
+ } else {
+ checkState(next == null, "unexpected non-null value for next");
+ if (seekNext()) {
+ next = WindowedValue.timestampedValueInGlobalWindow(reader.getCurrent(),
+ reader.getCurrentTimestamp());
+ return SUCCESSFULLY_OBTAINED_NEXT;
+ } else {
+ close();
+ return FAILED_TO_OBTAIN_NEXT;
}
- } catch (IOException e) {
- throw new RuntimeException(e);
}
+ } catch (final Exception e) {
+ throw new RuntimeException("Failed to read data.", e);
+ }
+ }
+
+ private void close() {
+ closed = true;
+ try {
+ reader.close();
+ } catch (final IOException e) {
+ throw new RuntimeException(e);
}
+ }
- @Override
- public WindowedValue<T> next() {
- return WindowedValue.timestampedValueInGlobalWindow(reader.getCurrent(),
- reader.getCurrentTimestamp());
+ private boolean seekNext() throws IOException {
+ if (!started) {
+ started = true;
+ return reader.start();
+ } else {
+ return !closed && reader.advance();
}
+ }
- @Override
- public void remove() {
- throw new UnsupportedOperationException("Remove from partition iterator is not allowed.");
+ private WindowedValue<T> consumeCurrent() {
+ if (next == null) {
+ throw new NoSuchElementException();
+ } else {
+ final WindowedValue<T> current = next;
+ next = null;
+ return current;
}
+ }
- private void closeIfNotClosed() {
- if (!closed) {
- closed = true;
- try {
- reader.close();
- } catch (IOException e) {
- throw new RuntimeException("Failed to close Reader.", e);
- }
- }
+ private WindowedValue<T> consumeNext() {
+ if (next == null) {
+ tryProduceNext();
}
- };
+ return consumeCurrent();
+ }
- return new InterruptibleIterator<>(context,
- scala.collection.JavaConversions.asScalaIterator(iter));
- }
+ @Override
+ public boolean hasNext() {
+ return next != null || tryProduceNext();
+ }
- private BoundedSource.BoundedReader<T> createReader(SourcePartition<T> partition) {
- try {
- return ((BoundedSource<T>) partition.source).createReader(
- runtimeContext.getPipelineOptions());
- } catch (IOException e) {
- throw new RuntimeException("Failed to create reader from a BoundedSource.", e);
+ @Override
+ public WindowedValue<T> next() {
+ return consumeNext();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
}
+
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/81b89acb/runners/spark/src/test/java/org/apache/beam/runners/spark/io/ReaderToIteratorAdapterTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/ReaderToIteratorAdapterTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/ReaderToIteratorAdapterTest.java
new file mode 100644
index 0000000..86f35ba
--- /dev/null
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/ReaderToIteratorAdapterTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.io;
+
+import static com.google.common.base.Preconditions.checkState;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/**
+ * Test for {@link SourceRDD.Bounded.ReaderToIteratorAdapter}.
+ */
+public class ReaderToIteratorAdapterTest {
+
+ @Rule
+ public ExpectedException exception = ExpectedException.none();
+
+ private static class TestReader extends Source.Reader<Integer> {
+
+ static final int LIMIT = 4;
+ static final int START = 1;
+
+ private Integer current = START - 1;
+ private boolean closed = false;
+ private boolean drained = false;
+
+ boolean isClosed() {
+ return closed;
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ return advance();
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ checkState(!drained && !closed);
+ drained = ++current >= LIMIT;
+ return !drained;
+ }
+
+ @Override
+ public Integer getCurrent() throws NoSuchElementException {
+ checkState(!drained && !closed);
+ return current;
+ }
+
+ @Override
+ public Instant getCurrentTimestamp() throws NoSuchElementException {
+ checkState(!drained && !closed);
+ return Instant.now();
+ }
+
+ @Override
+ public void close() throws IOException {
+ checkState(!closed);
+ closed = true;
+ }
+
+ @Override
+ public Source<Integer> getCurrentSource() {
+ return null;
+ }
+ }
+
+ private final TestReader testReader = new TestReader();
+
+ private final SourceRDD.Bounded.ReaderToIteratorAdapter<Integer> readerIterator =
+ new SourceRDD.Bounded.ReaderToIteratorAdapter<>(new MetricsContainer(""), testReader);
+
+ private void assertReaderRange(final int start, final int end) {
+ for (int i = start; i < end; i++) {
+ assertThat(readerIterator.next().getValue(), is(i));
+ }
+ }
+
+ @Test
+ public void testReaderIsClosedAfterDrainage() throws Exception {
+ assertReaderRange(TestReader.START, TestReader.LIMIT);
+
+ assertThat(readerIterator.hasNext(), is(false));
+
+ // reader is closed only after hasNext realises there are no more elements
+ assertThat(testReader.isClosed(), is(true));
+ }
+
+ @Test
+ public void testNextWhenDrainedThrows() throws Exception {
+ assertReaderRange(TestReader.START, TestReader.LIMIT);
+
+ exception.expect(NoSuchElementException.class);
+ readerIterator.next();
+ }
+
+ @Test
+ public void testHasNextIdempotencyCombo() throws Exception {
+ assertThat(readerIterator.hasNext(), is(true));
+ assertThat(readerIterator.hasNext(), is(true));
+
+ assertThat(readerIterator.next().getValue(), is(1));
+
+ assertThat(readerIterator.hasNext(), is(true));
+ assertThat(readerIterator.hasNext(), is(true));
+ assertThat(readerIterator.hasNext(), is(true));
+
+ assertThat(readerIterator.next().getValue(), is(2));
+ assertThat(readerIterator.next().getValue(), is(3));
+
+ // drained
+
+ assertThat(readerIterator.hasNext(), is(false));
+ assertThat(readerIterator.hasNext(), is(false));
+
+ // no next to give
+
+ exception.expect(NoSuchElementException.class);
+ readerIterator.next();
+ }
+
+}
[2/2] beam git commit: This closes #2854
Posted by st...@apache.org.
This closes #2854
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e10fbdaa
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e10fbdaa
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e10fbdaa
Branch: refs/heads/master
Commit: e10fbdaa2db480efb45074d59341ab5256a42d80
Parents: d96fd17 81b89ac
Author: Stas Levin <st...@apache.org>
Authored: Tue May 9 10:09:42 2017 +0300
Committer: Stas Levin <st...@apache.org>
Committed: Tue May 9 10:09:42 2017 +0300
----------------------------------------------------------------------
.../apache/beam/runners/spark/io/SourceRDD.java | 173 ++++++++++++-------
.../spark/io/ReaderToIteratorAdapterTest.java | 145 ++++++++++++++++
2 files changed, 260 insertions(+), 58 deletions(-)
----------------------------------------------------------------------