You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/08/18 00:50:36 UTC

[1/2] incubator-beam git commit: Closes #835

Repository: incubator-beam
Updated Branches:
  refs/heads/master 7ac8d6ded -> bfa3b70ab


Closes #835


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bfa3b70a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bfa3b70a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bfa3b70a

Branch: refs/heads/master
Commit: bfa3b70ab63c730a320d825ab9f2f93fee748a1c
Parents: 7ac8d6d 2c8a654
Author: Dan Halperin <dh...@google.com>
Authored: Wed Aug 17 17:50:07 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Aug 17 17:50:07 2016 -0700

----------------------------------------------------------------------
 .../beam/sdk/testing/SourceTestUtils.java       | 132 +++++++++++++++++++
 .../beam/sdk/testing/SourceTestUtilsTest.java   |  66 ++++++++++
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    |  12 +-
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     |  31 +++++
 4 files changed, 235 insertions(+), 6 deletions(-)
----------------------------------------------------------------------



[2/2] incubator-beam git commit: Fix NPE in BigQueryIO.TransformingReader

Posted by dh...@apache.org.
Fix NPE in BigQueryIO.TransformingReader


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2c8a6546
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2c8a6546
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2c8a6546

Branch: refs/heads/master
Commit: 2c8a6546af2adb1f7694f29a092338898f851d16
Parents: 7ac8d6d
Author: Pei He <pe...@google.com>
Authored: Mon Aug 15 17:23:20 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Aug 17 17:50:07 2016 -0700

----------------------------------------------------------------------
 .../beam/sdk/testing/SourceTestUtils.java       | 132 +++++++++++++++++++
 .../beam/sdk/testing/SourceTestUtilsTest.java   |  66 ++++++++++
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    |  12 +-
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     |  31 +++++
 4 files changed, 235 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c8a6546/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java
index e0b8890..9ce9c5e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.testing;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertEquals;
@@ -27,10 +29,15 @@ import static org.junit.Assert.assertTrue;
 
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
 import org.apache.beam.sdk.io.Source;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.KV;
 
+import com.google.common.collect.ImmutableList;
+
+import org.joda.time.Instant;
 import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,6 +45,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
@@ -45,6 +53,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
+import javax.annotation.Nullable;
+
 /**
  * Helper functions and test harnesses for checking correctness of {@link Source}
  * implementations.
@@ -673,4 +683,126 @@ public class SourceTestUtils {
         numItemsToReadBeforeSplitting, fraction, options);
     return (res.numResidualItems > 0);
   }
+
+  /**
+   * Returns an equivalent unsplittable {@code BoundedSource<T>}.
+   *
+   * <p>It forwards most methods to the given {@code boundedSource}, except:
+   * <ol>
+   * <li> {@link BoundedSource#splitIntoBundles} rejects initial splitting
+   * by returning itself in a list.
+   * <li> {@link BoundedReader#splitAtFraction} rejects dynamic splitting by returning null.
+   * </ol>
+   */
+  public static <T> BoundedSource<T> toUnsplittableSource(BoundedSource<T> boundedSource) {
+    return new UnsplittableSource<>(boundedSource);
+  }
+
+  private static class UnsplittableSource<T> extends BoundedSource<T> {
+
+    private final BoundedSource<T> boundedSource;
+
+    private UnsplittableSource(BoundedSource<T> boundedSource) {
+      this.boundedSource = checkNotNull(boundedSource, "boundedSource");
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      this.boundedSource.populateDisplayData(builder);
+    }
+
+    @Override
+    public List<? extends BoundedSource<T>> splitIntoBundles(
+        long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
+      return ImmutableList.of(this);
+    }
+
+    @Override
+    public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+      return boundedSource.getEstimatedSizeBytes(options);
+    }
+
+    @Override
+    public boolean producesSortedKeys(PipelineOptions options) throws Exception {
+      return boundedSource.producesSortedKeys(options);
+    }
+
+    @Override
+    public BoundedReader<T> createReader(PipelineOptions options) throws IOException {
+      return new UnsplittableReader<>(boundedSource, boundedSource.createReader(options));
+    }
+
+    @Override
+    public void validate() {
+      boundedSource.validate();
+    }
+
+    @Override
+    public Coder<T> getDefaultOutputCoder() {
+      return boundedSource.getDefaultOutputCoder();
+    }
+
+    private static class UnsplittableReader<T> extends BoundedReader<T> {
+
+      private final BoundedSource<T> boundedSource;
+      private final BoundedReader<T> boundedReader;
+
+      private UnsplittableReader(BoundedSource<T> boundedSource, BoundedReader<T> boundedReader) {
+        this.boundedSource = checkNotNull(boundedSource, "boundedSource");
+        this.boundedReader = checkNotNull(boundedReader, "boundedReader");
+      }
+
+      @Override
+      public BoundedSource<T> getCurrentSource() {
+        return boundedSource;
+      }
+
+      @Override
+      public boolean start() throws IOException {
+        return boundedReader.start();
+      }
+
+      @Override
+      public boolean advance() throws IOException {
+        return boundedReader.advance();
+      }
+
+      @Override
+      public T getCurrent() throws NoSuchElementException {
+        return boundedReader.getCurrent();
+      }
+
+      @Override
+      public void close() throws IOException {
+        boundedReader.close();
+      }
+
+      @Override
+      @Nullable
+      public BoundedSource<T> splitAtFraction(double fraction) {
+        return null;
+      }
+
+      @Override
+      @Nullable
+      public Double getFractionConsumed() {
+        return boundedReader.getFractionConsumed();
+      }
+
+      @Override
+      public long getSplitPointsConsumed() {
+        return boundedReader.getSplitPointsConsumed();
+      }
+
+      @Override
+      public long getSplitPointsRemaining() {
+        return boundedReader.getSplitPointsRemaining();
+      }
+
+      @Override
+      public Instant getCurrentTimestamp() throws NoSuchElementException {
+        return boundedReader.getCurrentTimestamp();
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c8a6546/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SourceTestUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SourceTestUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SourceTestUtilsTest.java
new file mode 100644
index 0000000..f2b332b
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SourceTestUtilsTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
+import org.apache.beam.sdk.io.CountingSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+
+import com.google.common.collect.Sets;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Tests for {@link SourceTestUtils}.
+ */
+@RunWith(JUnit4.class)
+public class SourceTestUtilsTest {
+
+  @Test
+  public void testToUnsplittableSource() throws Exception {
+    PipelineOptions options = PipelineOptionsFactory.create();
+    BoundedSource<Long> baseSource = CountingSource.upTo(100);
+    BoundedSource<Long> unsplittableSource = SourceTestUtils.toUnsplittableSource(baseSource);
+    List<?> splits = unsplittableSource.splitIntoBundles(1, options);
+    assertEquals(splits.size(), 1);
+    assertEquals(splits.get(0), unsplittableSource);
+
+    BoundedReader<Long> unsplittableReader = unsplittableSource.createReader(options);
+    assertEquals(0, unsplittableReader.getFractionConsumed(), 1e-15);
+
+    Set<Long> expected = Sets.newHashSet(SourceTestUtils.readFromSource(baseSource, options));
+    Set<Long> actual = Sets.newHashSet();
+    actual.addAll(SourceTestUtils.readNItemsFromUnstartedReader(unsplittableReader, 40));
+    assertNull(unsplittableReader.splitAtFraction(0.5));
+    actual.addAll(SourceTestUtils.readRemainingFromReader(unsplittableReader, true /* started */));
+    assertEquals(1, unsplittableReader.getFractionConsumed(), 1e-15);
+
+    assertEquals(100, actual.size());
+    assertEquals(Sets.newHashSet(expected), Sets.newHashSet(actual));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c8a6546/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index ce04467..e61dcca 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -1123,9 +1123,9 @@ public class BigQueryIO {
         BoundedSource<T> boundedSource,
         SerializableFunction<T, V> function,
         Coder<V> outputCoder) {
-      this.boundedSource = boundedSource;
-      this.function = function;
-      this.outputCoder = outputCoder;
+      this.boundedSource = checkNotNull(boundedSource, "boundedSource");
+      this.function = checkNotNull(function, "function");
+      this.outputCoder = checkNotNull(outputCoder, "outputCoder");
     }
 
     @Override
@@ -1170,7 +1170,7 @@ public class BigQueryIO {
       private final BoundedReader<T> boundedReader;
 
       private TransformingReader(BoundedReader<T> boundedReader) {
-        this.boundedReader = boundedReader;
+        this.boundedReader = checkNotNull(boundedReader, "boundedReader");
       }
 
       @Override
@@ -1201,8 +1201,8 @@ public class BigQueryIO {
 
       @Override
       public synchronized BoundedSource<V> splitAtFraction(double fraction) {
-        return new TransformingSource<>(
-            boundedReader.splitAtFraction(fraction), function, outputCoder);
+        BoundedSource<T> split = boundedReader.splitAtFraction(fraction);
+        return split == null ? null : new TransformingSource<>(split, function, outputCoder);
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c8a6546/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index fcaa054..ca60696 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -1216,6 +1216,37 @@ public class BigQueryIOTest implements Serializable {
   }
 
   @Test
+  public void testTransformingSourceUnsplittable() throws Exception {
+    int numElements = 10000;
+    @SuppressWarnings("deprecation")
+    BoundedSource<Long> longSource =
+        SourceTestUtils.toUnsplittableSource(CountingSource.upTo(numElements));
+    SerializableFunction<Long, String> toStringFn =
+        new SerializableFunction<Long, String>() {
+          @Override
+          public String apply(Long input) {
+            return input.toString();
+          }
+        };
+    BoundedSource<String> stringSource =
+        new TransformingSource<>(longSource, toStringFn, StringUtf8Coder.of());
+
+    List<String> expected = Lists.newArrayList();
+    for (int i = 0; i < numElements; i++) {
+      expected.add(String.valueOf(i));
+    }
+
+    PipelineOptions options = PipelineOptionsFactory.create();
+    Assert.assertThat(
+        SourceTestUtils.readFromSource(stringSource, options), CoreMatchers.is(expected));
+    SourceTestUtils.assertSplitAtFractionBehavior(
+        stringSource, 100, 0.3, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options);
+
+    SourceTestUtils.assertSourcesEqualReferenceSource(
+        stringSource, stringSource.splitIntoBundles(100, options), options);
+  }
+
+  @Test
   @Category(RunnableOnService.class)
   public void testPassThroughThenCleanup() throws Exception {
     Pipeline p = TestPipeline.create();