You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/08/18 00:50:36 UTC
[1/2] incubator-beam git commit: Closes #835
Repository: incubator-beam
Updated Branches:
refs/heads/master 7ac8d6ded -> bfa3b70ab
Closes #835
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bfa3b70a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bfa3b70a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bfa3b70a
Branch: refs/heads/master
Commit: bfa3b70ab63c730a320d825ab9f2f93fee748a1c
Parents: 7ac8d6d 2c8a654
Author: Dan Halperin <dh...@google.com>
Authored: Wed Aug 17 17:50:07 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Aug 17 17:50:07 2016 -0700
----------------------------------------------------------------------
.../beam/sdk/testing/SourceTestUtils.java | 132 +++++++++++++++++++
.../beam/sdk/testing/SourceTestUtilsTest.java | 66 ++++++++++
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 12 +-
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 31 +++++
4 files changed, 235 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
[2/2] incubator-beam git commit: Fix NPE in
BigQueryIO.TransformingReader
Posted by dh...@apache.org.
Fix NPE in BigQueryIO.TransformingReader
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2c8a6546
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2c8a6546
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2c8a6546
Branch: refs/heads/master
Commit: 2c8a6546af2adb1f7694f29a092338898f851d16
Parents: 7ac8d6d
Author: Pei He <pe...@google.com>
Authored: Mon Aug 15 17:23:20 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Aug 17 17:50:07 2016 -0700
----------------------------------------------------------------------
.../beam/sdk/testing/SourceTestUtils.java | 132 +++++++++++++++++++
.../beam/sdk/testing/SourceTestUtilsTest.java | 66 ++++++++++
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 12 +-
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 31 +++++
4 files changed, 235 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c8a6546/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java
index e0b8890..9ce9c5e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.testing;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
@@ -27,10 +29,15 @@ import static org.junit.Assert.assertTrue;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.KV;
+import com.google.common.collect.ImmutableList;
+
+import org.joda.time.Instant;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,6 +45,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
@@ -45,6 +53,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import javax.annotation.Nullable;
+
/**
* Helper functions and test harnesses for checking correctness of {@link Source}
* implementations.
@@ -673,4 +683,126 @@ public class SourceTestUtils {
numItemsToReadBeforeSplitting, fraction, options);
return (res.numResidualItems > 0);
}
+
+ /**
+ * Returns an equivalent unsplittable {@code BoundedSource<T>}.
+ *
+ * <p>It forwards most methods to the given {@code boundedSource}, except:
+ * <ol>
+ * <li> {@link BoundedSource#splitIntoBundles} rejects initial splitting
+ * by returning itself in a list.
+ * <li> {@link BoundedReader#splitAtFraction} rejects dynamic splitting by returning null.
+ * </ol>
+ */
+ public static <T> BoundedSource<T> toUnsplittableSource(BoundedSource<T> boundedSource) {
+ return new UnsplittableSource<>(boundedSource);
+ }
+
+ private static class UnsplittableSource<T> extends BoundedSource<T> {
+
+ private final BoundedSource<T> boundedSource;
+
+ private UnsplittableSource(BoundedSource<T> boundedSource) {
+ this.boundedSource = checkNotNull(boundedSource, "boundedSource");
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ this.boundedSource.populateDisplayData(builder);
+ }
+
+ @Override
+ public List<? extends BoundedSource<T>> splitIntoBundles(
+ long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
+ return ImmutableList.of(this);
+ }
+
+ @Override
+ public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+ return boundedSource.getEstimatedSizeBytes(options);
+ }
+
+ @Override
+ public boolean producesSortedKeys(PipelineOptions options) throws Exception {
+ return boundedSource.producesSortedKeys(options);
+ }
+
+ @Override
+ public BoundedReader<T> createReader(PipelineOptions options) throws IOException {
+ return new UnsplittableReader<>(boundedSource, boundedSource.createReader(options));
+ }
+
+ @Override
+ public void validate() {
+ boundedSource.validate();
+ }
+
+ @Override
+ public Coder<T> getDefaultOutputCoder() {
+ return boundedSource.getDefaultOutputCoder();
+ }
+
+ private static class UnsplittableReader<T> extends BoundedReader<T> {
+
+ private final BoundedSource<T> boundedSource;
+ private final BoundedReader<T> boundedReader;
+
+ private UnsplittableReader(BoundedSource<T> boundedSource, BoundedReader<T> boundedReader) {
+ this.boundedSource = checkNotNull(boundedSource, "boundedSource");
+ this.boundedReader = checkNotNull(boundedReader, "boundedReader");
+ }
+
+ @Override
+ public BoundedSource<T> getCurrentSource() {
+ return boundedSource;
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ return boundedReader.start();
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ return boundedReader.advance();
+ }
+
+ @Override
+ public T getCurrent() throws NoSuchElementException {
+ return boundedReader.getCurrent();
+ }
+
+ @Override
+ public void close() throws IOException {
+ boundedReader.close();
+ }
+
+ @Override
+ @Nullable
+ public BoundedSource<T> splitAtFraction(double fraction) {
+ return null;
+ }
+
+ @Override
+ @Nullable
+ public Double getFractionConsumed() {
+ return boundedReader.getFractionConsumed();
+ }
+
+ @Override
+ public long getSplitPointsConsumed() {
+ return boundedReader.getSplitPointsConsumed();
+ }
+
+ @Override
+ public long getSplitPointsRemaining() {
+ return boundedReader.getSplitPointsRemaining();
+ }
+
+ @Override
+ public Instant getCurrentTimestamp() throws NoSuchElementException {
+ return boundedReader.getCurrentTimestamp();
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c8a6546/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SourceTestUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SourceTestUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SourceTestUtilsTest.java
new file mode 100644
index 0000000..f2b332b
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SourceTestUtilsTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
+import org.apache.beam.sdk.io.CountingSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+
+import com.google.common.collect.Sets;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Tests for {@link SourceTestUtils}.
+ */
+@RunWith(JUnit4.class)
+public class SourceTestUtilsTest {
+
+ @Test
+ public void testToUnsplittableSource() throws Exception {
+ PipelineOptions options = PipelineOptionsFactory.create();
+ BoundedSource<Long> baseSource = CountingSource.upTo(100);
+ BoundedSource<Long> unsplittableSource = SourceTestUtils.toUnsplittableSource(baseSource);
+ List<?> splits = unsplittableSource.splitIntoBundles(1, options);
+ assertEquals(splits.size(), 1);
+ assertEquals(splits.get(0), unsplittableSource);
+
+ BoundedReader<Long> unsplittableReader = unsplittableSource.createReader(options);
+ assertEquals(0, unsplittableReader.getFractionConsumed(), 1e-15);
+
+ Set<Long> expected = Sets.newHashSet(SourceTestUtils.readFromSource(baseSource, options));
+ Set<Long> actual = Sets.newHashSet();
+ actual.addAll(SourceTestUtils.readNItemsFromUnstartedReader(unsplittableReader, 40));
+ assertNull(unsplittableReader.splitAtFraction(0.5));
+ actual.addAll(SourceTestUtils.readRemainingFromReader(unsplittableReader, true /* started */));
+ assertEquals(1, unsplittableReader.getFractionConsumed(), 1e-15);
+
+ assertEquals(100, actual.size());
+ assertEquals(Sets.newHashSet(expected), Sets.newHashSet(actual));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c8a6546/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index ce04467..e61dcca 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -1123,9 +1123,9 @@ public class BigQueryIO {
BoundedSource<T> boundedSource,
SerializableFunction<T, V> function,
Coder<V> outputCoder) {
- this.boundedSource = boundedSource;
- this.function = function;
- this.outputCoder = outputCoder;
+ this.boundedSource = checkNotNull(boundedSource, "boundedSource");
+ this.function = checkNotNull(function, "function");
+ this.outputCoder = checkNotNull(outputCoder, "outputCoder");
}
@Override
@@ -1170,7 +1170,7 @@ public class BigQueryIO {
private final BoundedReader<T> boundedReader;
private TransformingReader(BoundedReader<T> boundedReader) {
- this.boundedReader = boundedReader;
+ this.boundedReader = checkNotNull(boundedReader, "boundedReader");
}
@Override
@@ -1201,8 +1201,8 @@ public class BigQueryIO {
@Override
public synchronized BoundedSource<V> splitAtFraction(double fraction) {
- return new TransformingSource<>(
- boundedReader.splitAtFraction(fraction), function, outputCoder);
+ BoundedSource<T> split = boundedReader.splitAtFraction(fraction);
+ return split == null ? null : new TransformingSource<>(split, function, outputCoder);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c8a6546/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index fcaa054..ca60696 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -1216,6 +1216,37 @@ public class BigQueryIOTest implements Serializable {
}
@Test
+ public void testTransformingSourceUnsplittable() throws Exception {
+ int numElements = 10000;
+ @SuppressWarnings("deprecation")
+ BoundedSource<Long> longSource =
+ SourceTestUtils.toUnsplittableSource(CountingSource.upTo(numElements));
+ SerializableFunction<Long, String> toStringFn =
+ new SerializableFunction<Long, String>() {
+ @Override
+ public String apply(Long input) {
+ return input.toString();
+ }
+ };
+ BoundedSource<String> stringSource =
+ new TransformingSource<>(longSource, toStringFn, StringUtf8Coder.of());
+
+ List<String> expected = Lists.newArrayList();
+ for (int i = 0; i < numElements; i++) {
+ expected.add(String.valueOf(i));
+ }
+
+ PipelineOptions options = PipelineOptionsFactory.create();
+ Assert.assertThat(
+ SourceTestUtils.readFromSource(stringSource, options), CoreMatchers.is(expected));
+ SourceTestUtils.assertSplitAtFractionBehavior(
+ stringSource, 100, 0.3, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options);
+
+ SourceTestUtils.assertSourcesEqualReferenceSource(
+ stringSource, stringSource.splitIntoBundles(100, options), options);
+ }
+
+ @Test
@Category(RunnableOnService.class)
public void testPassThroughThenCleanup() throws Exception {
Pipeline p = TestPipeline.create();