You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/04/22 03:21:30 UTC
[2/3] incubator-beam git commit: Implement Create as An
OffsetBasedSource
Implement Create as An OffsetBasedSource
This removes the requirement to implement a primtiive Create for runners
that support Reads from a Bounded Source.
Remove Dataflow Runner references to Create.Values
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/61988f34
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/61988f34
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/61988f34
Branch: refs/heads/master
Commit: 61988f34dc4481bb91efcced7a37abf15b420ffa
Parents: 640e258
Author: Thomas Groh <tg...@google.com>
Authored: Thu Apr 14 15:22:38 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Apr 21 18:20:54 2016 -0700
----------------------------------------------------------------------
.../sdk/runners/DataflowPipelineTranslator.java | 42 ----
.../sdk/runners/DataflowPipelineRunnerTest.java | 12 +-
.../runners/DataflowPipelineTranslatorTest.java | 4 +-
.../sdk/runners/inprocess/InProcessCreate.java | 236 -------------------
.../inprocess/InProcessPipelineRunner.java | 3 -
.../org/apache/beam/sdk/transforms/Create.java | 198 ++++++++++++----
.../beam/sdk/runners/TransformTreeTest.java | 3 +-
.../EncodabilityEnforcementFactoryTest.java | 2 +-
.../runners/inprocess/InProcessCreateTest.java | 236 -------------------
.../apache/beam/sdk/transforms/CreateTest.java | 134 +++++++++++
10 files changed, 299 insertions(+), 571 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/61988f34/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipelineTranslator.java
index 4e60545..5c0745f 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipelineTranslator.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.runners;
-import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray;
import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray;
import static org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString;
import static org.apache.beam.sdk.util.StringUtils.jsonStringToByteArray;
@@ -34,7 +33,6 @@ import static com.google.common.base.Preconditions.checkArgument;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.io.BigQueryIO;
import org.apache.beam.sdk.io.PubsubIO;
@@ -47,7 +45,6 @@ import org.apache.beam.sdk.runners.dataflow.PubsubIOTranslator;
import org.apache.beam.sdk.runners.dataflow.ReadTranslator;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
@@ -845,45 +842,6 @@ public class DataflowPipelineTranslator {
});
registerTransformTranslator(
- Create.Values.class,
- new TransformTranslator<Create.Values>() {
- @Override
- public void translate(
- Create.Values transform,
- TranslationContext context) {
- createHelper(transform, context);
- }
-
- private <T> void createHelper(
- Create.Values<T> transform,
- TranslationContext context) {
- context.addStep(transform, "CreateCollection");
-
- Coder<T> coder = context.getOutput(transform).getCoder();
- List<CloudObject> elements = new LinkedList<>();
- for (T elem : transform.getElements()) {
- byte[] encodedBytes;
- try {
- encodedBytes = encodeToByteArray(coder, elem);
- } catch (CoderException exn) {
- // TODO: Put in better element printing:
- // truncate if too long.
- throw new IllegalArgumentException(
- "Unable to encode element '" + elem + "' of transform '" + transform
- + "' using coder '" + coder + "'.",
- exn);
- }
- String encodedJson = byteArrayToJsonString(encodedBytes);
- assert Arrays.equals(encodedBytes,
- jsonStringToByteArray(encodedJson));
- elements.add(CloudObject.forString(encodedJson));
- }
- context.addInput(PropertyNames.ELEMENT, elements);
- context.addValueOnlyOutput(PropertyNames.OUTPUT, context.getOutput(transform));
- }
- });
-
- registerTransformTranslator(
Flatten.FlattenPCollectionList.class,
new TransformTranslator<Flatten.FlattenPCollectionList>() {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/61988f34/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineRunnerTest.java
index 8b024fb..6949128 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineRunnerTest.java
@@ -21,6 +21,7 @@ import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.startsWith;
import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
@@ -840,9 +841,16 @@ public class DataflowPipelineRunnerTest {
CompositeTransformRecorder recorder = new CompositeTransformRecorder();
p.traverseTopologically(recorder);
- assertThat("Expected to have seen CreateTimestamped composite transform.",
+ // The recorder will also have seen a Create.Values composite as well, but we can't obtain that
+ // transform.
+ assertThat(
+ "Expected to have seen CreateTimestamped composite transform.",
recorder.getCompositeTransforms(),
- Matchers.<PTransform<?, ?>>contains(transform));
+ hasItem(transform));
+ assertThat(
+ "Expected to have two composites, CreateTimestamped and Create.Values",
+ recorder.getCompositeTransforms(),
+ hasItem(Matchers.<PTransform<?, ?>>isA((Class) Create.Values.class)));
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/61988f34/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java
index 0d58601..a62f550 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java
@@ -751,7 +751,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
assertEquals(2, steps.size());
Step createStep = steps.get(0);
- assertEquals("CreateCollection", createStep.getKind());
+ assertEquals("ParallelRead", createStep.getKind());
Step collectionToSingletonStep = steps.get(1);
assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
@@ -783,7 +783,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
assertEquals(2, steps.size());
Step createStep = steps.get(0);
- assertEquals("CreateCollection", createStep.getKind());
+ assertEquals("ParallelRead", createStep.getKind());
Step collectionToSingletonStep = steps.get(1);
assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/61988f34/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessCreate.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessCreate.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessCreate.java
deleted file mode 100644
index c29d5ce..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessCreate.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.OffsetBasedSource;
-import org.apache.beam.sdk.io.OffsetBasedSource.OffsetBasedReader;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.Create.Values;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-import javax.annotation.Nullable;
-
-/**
- * An in-process implementation of the {@link Values Create.Values} {@link PTransform}, implemented
- * using a {@link BoundedSource}.
- *
- * The coder is inferred via the {@link Values#getDefaultOutputCoder(PInput)} method on the original
- * transform.
- */
-class InProcessCreate<T> extends ForwardingPTransform<PInput, PCollection<T>> {
- private final Create.Values<T> original;
-
- /**
- * A {@link PTransformOverrideFactory} for {@link InProcessCreate}.
- */
- public static class InProcessCreateOverrideFactory implements PTransformOverrideFactory {
- @Override
- public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override(
- PTransform<InputT, OutputT> transform) {
- if (transform instanceof Create.Values) {
- @SuppressWarnings("unchecked")
- PTransform<InputT, OutputT> override =
- (PTransform<InputT, OutputT>) from((Create.Values<OutputT>) transform);
- return override;
- }
- return transform;
- }
- }
-
- public static <T> InProcessCreate<T> from(Create.Values<T> original) {
- return new InProcessCreate<>(original);
- }
-
- private InProcessCreate(Values<T> original) {
- this.original = original;
- }
-
- @Override
- public PCollection<T> apply(PInput input) {
- Coder<T> elementCoder;
- try {
- elementCoder = original.getDefaultOutputCoder(input);
- } catch (CannotProvideCoderException e) {
- throw new IllegalArgumentException(
- "Unable to infer a coder and no Coder was specified. "
- + "Please set a coder by invoking Create.withCoder() explicitly.",
- e);
- }
- InMemorySource<T> source;
- try {
- source = InMemorySource.fromIterable(original.getElements(), elementCoder);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- PCollection<T> result = input.getPipeline().apply(Read.from(source));
- result.setCoder(elementCoder);
- return result;
- }
-
- @Override
- public PTransform<PInput, PCollection<T>> delegate() {
- return original;
- }
-
- @VisibleForTesting
- static class InMemorySource<T> extends OffsetBasedSource<T> {
- private final List<byte[]> allElementsBytes;
- private final long totalSize;
- private final Coder<T> coder;
-
- public static <T> InMemorySource<T> fromIterable(Iterable<T> elements, Coder<T> elemCoder)
- throws CoderException, IOException {
- ImmutableList.Builder<byte[]> allElementsBytes = ImmutableList.builder();
- long totalSize = 0L;
- for (T element : elements) {
- byte[] bytes = CoderUtils.encodeToByteArray(elemCoder, element);
- allElementsBytes.add(bytes);
- totalSize += bytes.length;
- }
- return new InMemorySource<>(allElementsBytes.build(), totalSize, elemCoder);
- }
-
- /**
- * Create a new source with the specified bytes. The new source owns the input element bytes,
- * which must not be modified after this constructor is called.
- */
- private InMemorySource(List<byte[]> elementBytes, long totalSize, Coder<T> coder) {
- super(0, elementBytes.size(), 1);
- this.allElementsBytes = ImmutableList.copyOf(elementBytes);
- this.totalSize = totalSize;
- this.coder = coder;
- }
-
- @Override
- public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
- return totalSize;
- }
-
- @Override
- public boolean producesSortedKeys(PipelineOptions options) throws Exception {
- return false;
- }
-
- @Override
- public BoundedSource.BoundedReader<T> createReader(PipelineOptions options) throws IOException {
- return new BytesReader<>(this);
- }
-
- @Override
- public void validate() {}
-
- @Override
- public Coder<T> getDefaultOutputCoder() {
- return coder;
- }
-
- @Override
- public long getMaxEndOffset(PipelineOptions options) throws Exception {
- return allElementsBytes.size();
- }
-
- @Override
- public OffsetBasedSource<T> createSourceForSubrange(long start, long end) {
- List<byte[]> primaryElems = allElementsBytes.subList((int) start, (int) end);
- long primarySizeEstimate =
- (long) (totalSize * primaryElems.size() / (double) allElementsBytes.size());
- return new InMemorySource<>(primaryElems, primarySizeEstimate, coder);
- }
-
- @Override
- public long getBytesPerOffset() {
- if (allElementsBytes.size() == 0) {
- return 0L;
- }
- return totalSize / allElementsBytes.size();
- }
- }
-
- private static class BytesReader<T> extends OffsetBasedReader<T> {
- private int index;
- /**
- * Use an optional to distinguish between null next element (as Optional.absent()) and no next
- * element (next is null).
- */
- @Nullable private Optional<T> next;
-
- public BytesReader(InMemorySource<T> source) {
- super(source);
- index = -1;
- }
-
- @Override
- @Nullable
- public T getCurrent() throws NoSuchElementException {
- if (next == null) {
- throw new NoSuchElementException();
- }
- return next.orNull();
- }
-
- @Override
- public void close() throws IOException {}
-
- @Override
- protected long getCurrentOffset() {
- return index;
- }
-
- @Override
- protected boolean startImpl() throws IOException {
- return advanceImpl();
- }
-
- @Override
- public synchronized InMemorySource<T> getCurrentSource() {
- return (InMemorySource<T>) super.getCurrentSource();
- }
-
- @Override
- protected boolean advanceImpl() throws IOException {
- InMemorySource<T> source = getCurrentSource();
- index++;
- if (index >= source.allElementsBytes.size()) {
- return false;
- }
- next =
- Optional.fromNullable(
- CoderUtils.decodeFromByteArray(
- source.coder, source.allElementsBytes.get(index)));
- return true;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/61988f34/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRunner.java
index 6cc35fb..7c28238 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRunner.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRunner.java
@@ -30,11 +30,9 @@ import org.apache.beam.sdk.runners.AggregatorValues;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.runners.inprocess.GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly;
import org.apache.beam.sdk.runners.inprocess.GroupByKeyEvaluatorFactory.InProcessGroupByKeyOverrideFactory;
-import org.apache.beam.sdk.runners.inprocess.InProcessCreate.InProcessCreateOverrideFactory;
import org.apache.beam.sdk.runners.inprocess.ViewEvaluatorFactory.InProcessViewOverrideFactory;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
@@ -83,7 +81,6 @@ public class InProcessPipelineRunner
private static Map<Class<? extends PTransform>, PTransformOverrideFactory>
defaultTransformOverrides =
ImmutableMap.<Class<? extends PTransform>, PTransformOverrideFactory>builder()
- .put(Create.Values.class, new InProcessCreateOverrideFactory())
.put(GroupByKey.class, new InProcessGroupByKeyOverrideFactory())
.put(CreatePCollectionView.class, new InProcessViewOverrideFactory())
.put(AvroIO.Write.Bound.class, new AvroIOShardedWriteFactory())
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/61988f34/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
index 27fb39d..1bd4fb3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
@@ -20,33 +20,42 @@ package org.apache.beam.sdk.transforms;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.runners.DirectPipelineRunner;
-import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.OffsetBasedSource;
+import org.apache.beam.sdk.io.OffsetBasedSource.OffsetBasedReader;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder;
import org.apache.beam.sdk.values.TypeDescriptor;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import org.joda.time.Instant;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.NoSuchElementException;
import java.util.Objects;
+import javax.annotation.Nullable;
+
/**
* {@code Create<T>} takes a collection of elements of type {@code T}
* known when the pipeline is constructed and returns a
@@ -237,12 +246,13 @@ public class Create<T> {
public PCollection<T> apply(PInput input) {
try {
Coder<T> coder = getDefaultOutputCoder(input);
- return PCollection
- .<T>createPrimitiveOutputInternal(
- input.getPipeline(),
- WindowingStrategy.globalDefault(),
- IsBounded.BOUNDED)
- .setCoder(coder);
+ try {
+ CreateSource<T> source = CreateSource.fromIterable(elems, coder);
+ return input.getPipeline().apply(Read.from(source));
+ } catch (IOException e) {
+ throw new RuntimeException(
+ String.format("Unable to apply Create %s using Coder %s.", this, coder), e);
+ }
} catch (CannotProvideCoderException e) {
throw new IllegalArgumentException("Unable to infer a coder and no Coder was specified. "
+ "Please set a coder by invoking Create.withCoder() explicitly.", e);
@@ -320,6 +330,136 @@ public class Create<T> {
this.elems = elems;
this.coder = coder;
}
+
+ @VisibleForTesting
+ static class CreateSource<T> extends OffsetBasedSource<T> {
+ private final List<byte[]> allElementsBytes;
+ private final long totalSize;
+ private final Coder<T> coder;
+
+ public static <T> CreateSource<T> fromIterable(Iterable<T> elements, Coder<T> elemCoder)
+ throws CoderException, IOException {
+ ImmutableList.Builder<byte[]> allElementsBytes = ImmutableList.builder();
+ long totalSize = 0L;
+ for (T element : elements) {
+ byte[] bytes = CoderUtils.encodeToByteArray(elemCoder, element);
+ allElementsBytes.add(bytes);
+ totalSize += bytes.length;
+ }
+ return new CreateSource<>(allElementsBytes.build(), totalSize, elemCoder);
+ }
+
+ /**
+ * Create a new source with the specified bytes. The new source owns the input element bytes,
+ * which must not be modified after this constructor is called.
+ */
+ private CreateSource(List<byte[]> elementBytes, long totalSize, Coder<T> coder) {
+ super(0, elementBytes.size(), 1);
+ this.allElementsBytes = ImmutableList.copyOf(elementBytes);
+ this.totalSize = totalSize;
+ this.coder = coder;
+ }
+
+ @Override
+ public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+ return totalSize;
+ }
+
+ @Override
+ public boolean producesSortedKeys(PipelineOptions options) throws Exception {
+ return false;
+ }
+
+ @Override
+ public BoundedSource.BoundedReader<T> createReader(PipelineOptions options)
+ throws IOException {
+ return new BytesReader<>(this);
+ }
+
+ @Override
+ public void validate() {}
+
+ @Override
+ public Coder<T> getDefaultOutputCoder() {
+ return coder;
+ }
+
+ @Override
+ public long getMaxEndOffset(PipelineOptions options) throws Exception {
+ return allElementsBytes.size();
+ }
+
+ @Override
+ public OffsetBasedSource<T> createSourceForSubrange(long start, long end) {
+ List<byte[]> primaryElems = allElementsBytes.subList((int) start, (int) end);
+ long primarySizeEstimate =
+ (long) (totalSize * primaryElems.size() / (double) allElementsBytes.size());
+ return new CreateSource<>(primaryElems, primarySizeEstimate, coder);
+ }
+
+ @Override
+ public long getBytesPerOffset() {
+ if (allElementsBytes.size() == 0) {
+ return 0L;
+ }
+ return totalSize / allElementsBytes.size();
+ }
+ }
+
+ private static class BytesReader<T> extends OffsetBasedReader<T> {
+ private int index;
+ /**
+ * Use an optional to distinguish between null next element (as Optional.absent()) and no next
+ * element (next is null).
+ */
+ @Nullable private Optional<T> next;
+
+ public BytesReader(CreateSource<T> source) {
+ super(source);
+ index = -1;
+ }
+
+ @Override
+ @Nullable
+ public T getCurrent() throws NoSuchElementException {
+ if (next == null) {
+ throw new NoSuchElementException();
+ }
+ return next.orNull();
+ }
+
+ @Override
+ public void close() throws IOException {}
+
+ @Override
+ protected long getCurrentOffset() {
+ return index;
+ }
+
+ @Override
+ protected boolean startImpl() throws IOException {
+ return advanceImpl();
+ }
+
+ @Override
+ public synchronized CreateSource<T> getCurrentSource() {
+ return (CreateSource<T>) super.getCurrentSource();
+ }
+
+ @Override
+ protected boolean advanceImpl() throws IOException {
+ CreateSource<T> source = getCurrentSource();
+ index++;
+ if (index >= source.allElementsBytes.size()) {
+ next = null;
+ return false;
+ }
+ next =
+ Optional.fromNullable(
+ CoderUtils.decodeFromByteArray(source.coder, source.allElementsBytes.get(index)));
+ return true;
+ }
+ }
}
/////////////////////////////////////////////////////////////////////////////
@@ -387,42 +527,4 @@ public class Create<T> {
}
}
}
-
- /////////////////////////////////////////////////////////////////////////////
-
- static {
- registerDefaultTransformEvaluator();
- }
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- private static void registerDefaultTransformEvaluator() {
- DirectPipelineRunner.registerDefaultTransformEvaluator(
- Create.Values.class,
- new DirectPipelineRunner.TransformEvaluator<Create.Values>() {
- @Override
- public void evaluate(
- Create.Values transform,
- DirectPipelineRunner.EvaluationContext context) {
- evaluateHelper(transform, context);
- }
- });
- }
-
- private static <T> void evaluateHelper(
- Create.Values<T> transform,
- DirectPipelineRunner.EvaluationContext context) {
- // Convert the Iterable of elems into a List of elems.
- List<T> listElems;
- if (transform.elems instanceof Collection) {
- Collection<T> collectionElems = (Collection<T>) transform.elems;
- listElems = new ArrayList<>(collectionElems.size());
- } else {
- listElems = new ArrayList<>();
- }
- for (T elem : transform.elems) {
- listElems.add(
- context.ensureElementEncodable(context.getOutput(transform), elem));
- }
- context.setPCollection(context.getOutput(transform), listElems);
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/61988f34/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
index 7690d2b..e4eb204 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
@@ -158,7 +158,8 @@ public class TransformTreeTest {
// Pick is a composite, should not be visited here.
assertThat(transform, not(instanceOf(Sample.SampleAny.class)));
assertThat(transform, not(instanceOf(Write.Bound.class)));
- if (transform instanceof Read.Bounded) {
+ if (transform instanceof Read.Bounded
+ && node.getEnclosingNode().getTransform() instanceof TextIO.Read.Bound) {
assertTrue(visited.add(TransformsSeen.READ));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/61988f34/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java
index 85c4322..cade02b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java
@@ -56,7 +56,7 @@ public class EncodabilityEnforcementFactoryTest {
public void encodeFailsThrows() {
TestPipeline p = TestPipeline.create();
PCollection<Record> unencodable =
- p.apply(Create.of(new Record()).withCoder(new RecordNoEncodeCoder()));
+ p.apply(Create.<Record>of().withCoder(new RecordNoEncodeCoder()));
AppliedPTransform<?, ?, ?> consumer =
unencodable.apply(Count.<Record>globally()).getProducingTransformInternal();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/61988f34/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessCreateTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessCreateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessCreateTest.java
deleted file mode 100644
index 5c63af1..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessCreateTest.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.hasSize;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.NullableCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.runners.inprocess.InProcessCreate.InMemorySource;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
-import org.apache.beam.sdk.testing.SourceTestUtils;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.util.SerializableUtils;
-import org.apache.beam.sdk.values.PCollection;
-
-import com.google.common.collect.ImmutableList;
-
-import org.hamcrest.Matchers;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-
-/**
- * Tests for {@link InProcessCreate}.
- */
-@RunWith(JUnit4.class)
-public class InProcessCreateTest {
- @Rule
- public ExpectedException thrown = ExpectedException.none();
-
- @Test
- @Category(RunnableOnService.class)
- public void testConvertsCreate() {
- TestPipeline p = TestPipeline.create();
- Create.Values<Integer> og = Create.of(1, 2, 3);
-
- InProcessCreate<Integer> converted = InProcessCreate.from(og);
-
- PAssert.that(p.apply(converted)).containsInAnyOrder(2, 1, 3);
-
- p.run();
- }
-
- @Test
- @Category(RunnableOnService.class)
- public void testConvertsCreateWithNullElements() {
- Create.Values<String> og =
- Create.<String>of("foo", null, "spam", "ham", null, "eggs")
- .withCoder(NullableCoder.of(StringUtf8Coder.of()));
-
- InProcessCreate<String> converted = InProcessCreate.from(og);
- TestPipeline p = TestPipeline.create();
-
- PAssert.that(p.apply(converted))
- .containsInAnyOrder(null, "foo", null, "spam", "ham", "eggs");
-
- p.run();
- }
-
- static class Record implements Serializable {}
-
- static class Record2 extends Record {}
-
- @Test
- public void testThrowsIllegalArgumentWhenCannotInferCoder() {
- Create.Values<Record> og = Create.of(new Record(), new Record2());
- InProcessCreate<Record> converted = InProcessCreate.from(og);
-
- Pipeline p = TestPipeline.create();
-
- // Create won't infer a default coder in this case.
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage(Matchers.containsString("Unable to infer a coder"));
-
- PCollection<Record> c = p.apply(converted);
- p.run();
-
- fail("Unexpectedly Inferred Coder " + c.getCoder());
- }
-
- /**
- * An unserializable class to demonstrate encoding of elements.
- */
- private static class UnserializableRecord {
- private final String myString;
-
- private UnserializableRecord(String myString) {
- this.myString = myString;
- }
-
- @Override
- public int hashCode() {
- return myString.hashCode();
- }
-
- @Override
- public boolean equals(Object o) {
- return myString.equals(((UnserializableRecord) o).myString);
- }
-
- static class UnserializableRecordCoder extends AtomicCoder<UnserializableRecord> {
- private final Coder<String> stringCoder = StringUtf8Coder.of();
-
- @Override
- public void encode(
- UnserializableRecord value,
- OutputStream outStream,
- org.apache.beam.sdk.coders.Coder.Context context)
- throws CoderException, IOException {
- stringCoder.encode(value.myString, outStream, context.nested());
- }
-
- @Override
- public UnserializableRecord decode(
- InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context)
- throws CoderException, IOException {
- return new UnserializableRecord(stringCoder.decode(inStream, context.nested()));
- }
- }
- }
-
- @Test
- @Category(RunnableOnService.class)
- public void testConvertsUnserializableElements() throws Exception {
- List<UnserializableRecord> elements =
- ImmutableList.of(
- new UnserializableRecord("foo"),
- new UnserializableRecord("bar"),
- new UnserializableRecord("baz"));
- InProcessCreate<UnserializableRecord> create =
- InProcessCreate.from(
- Create.of(elements).withCoder(new UnserializableRecord.UnserializableRecordCoder()));
-
- TestPipeline p = TestPipeline.create();
- PAssert.that(p.apply(create))
- .containsInAnyOrder(
- new UnserializableRecord("foo"),
- new UnserializableRecord("bar"),
- new UnserializableRecord("baz"));
- p.run();
- }
-
- @Test
- public void testSerializableOnUnserializableElements() throws Exception {
- List<UnserializableRecord> elements =
- ImmutableList.of(
- new UnserializableRecord("foo"),
- new UnserializableRecord("bar"),
- new UnserializableRecord("baz"));
- InMemorySource<UnserializableRecord> source =
- InMemorySource.fromIterable(elements, new UnserializableRecord.UnserializableRecordCoder());
- SerializableUtils.ensureSerializable(source);
- }
-
- @Test
- public void testSplitIntoBundles() throws Exception {
- InProcessCreate.InMemorySource<Integer> source =
- InMemorySource.fromIterable(
- ImmutableList.of(1, 2, 3, 4, 5, 6, 7, 8), BigEndianIntegerCoder.of());
- PipelineOptions options = PipelineOptionsFactory.create();
- List<? extends BoundedSource<Integer>> splitSources = source.splitIntoBundles(12, options);
- assertThat(splitSources, hasSize(3));
- SourceTestUtils.assertSourcesEqualReferenceSource(source, splitSources, options);
- }
-
- @Test
- public void testDoesNotProduceSortedKeys() throws Exception {
- InProcessCreate.InMemorySource<String> source =
- InMemorySource.fromIterable(ImmutableList.of("spam", "ham", "eggs"), StringUtf8Coder.of());
- assertThat(source.producesSortedKeys(PipelineOptionsFactory.create()), is(false));
- }
-
- @Test
- public void testGetDefaultOutputCoderReturnsConstructorCoder() throws Exception {
- Coder<Integer> coder = VarIntCoder.of();
- InProcessCreate.InMemorySource<Integer> source =
- InMemorySource.fromIterable(ImmutableList.of(1, 2, 3, 4, 5, 6, 7, 8), coder);
-
- Coder<Integer> defaultCoder = source.getDefaultOutputCoder();
- assertThat(defaultCoder, equalTo(coder));
- }
-
- @Test
- public void testSplitAtFraction() throws Exception {
- List<Integer> elements = new ArrayList<>();
- Random random = new Random();
- for (int i = 0; i < 25; i++) {
- elements.add(random.nextInt());
- }
- InProcessCreate.InMemorySource<Integer> source =
- InMemorySource.fromIterable(elements, VarIntCoder.of());
-
- SourceTestUtils.assertSplitAtFractionExhaustive(source, PipelineOptionsFactory.create());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/61988f34/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
index 393fede..2998489 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
@@ -22,19 +22,36 @@ import static org.apache.beam.sdk.TestUtils.LINES_ARRAY;
import static org.apache.beam.sdk.TestUtils.NO_LINES;
import static org.apache.beam.sdk.TestUtils.NO_LINES_ARRAY;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
+import org.apache.beam.sdk.testing.SourceTestUtils;
import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create.Values.CreateSource;
+import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
+import com.google.common.collect.ImmutableList;
+
import org.hamcrest.Matchers;
import org.joda.time.Instant;
import org.junit.Rule;
@@ -44,11 +61,15 @@ import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Random;
/**
* Tests for Create.
@@ -142,6 +163,67 @@ public class CreateTest {
TimestampedValue.of("a", new Instant(0)),
TimestampedValue.of("b", new Instant(0)));
}
+ /**
+ * An unserializable class to demonstrate encoding of elements.
+ */
+ private static class UnserializableRecord {
+ private final String myString;
+
+ private UnserializableRecord(String myString) {
+ this.myString = myString;
+ }
+
+ @Override
+ public int hashCode() {
+ return myString.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return myString.equals(((UnserializableRecord) o).myString);
+ }
+
+ static class UnserializableRecordCoder extends AtomicCoder<UnserializableRecord> {
+ private final Coder<String> stringCoder = StringUtf8Coder.of();
+
+ @Override
+ public void encode(
+ UnserializableRecord value,
+ OutputStream outStream,
+ org.apache.beam.sdk.coders.Coder.Context context)
+ throws CoderException, IOException {
+ stringCoder.encode(value.myString, outStream, context.nested());
+ }
+
+ @Override
+ public UnserializableRecord decode(
+ InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context)
+ throws CoderException, IOException {
+ return new UnserializableRecord(stringCoder.decode(inStream, context.nested()));
+ }
+ }
+ }
+
+ @Test
+ @Category(RunnableOnService.class)
+ public void testCreateWithUnserializableElements() throws Exception {
+ List<UnserializableRecord> elements =
+ ImmutableList.of(
+ new UnserializableRecord("foo"),
+ new UnserializableRecord("bar"),
+ new UnserializableRecord("baz"));
+ Create.Values<UnserializableRecord> create =
+ Create.of(elements).withCoder(new UnserializableRecord.UnserializableRecordCoder());
+
+ TestPipeline p = TestPipeline.create();
+ PAssert.that(p.apply(create))
+ .containsInAnyOrder(
+ new UnserializableRecord("foo"),
+ new UnserializableRecord("bar"),
+ new UnserializableRecord("baz"));
+ p.run();
+ }
+
private static class PrintTimestamps extends DoFn<String, String> {
@Override
public void processElement(ProcessContext c) {
@@ -239,4 +321,56 @@ public class CreateTest {
assertEquals("Create.Values", Create.of(1, 2, 3).getName());
assertEquals("Create.TimestampedValues", Create.timestamped(Collections.EMPTY_LIST).getName());
}
+
+ @Test
+ public void testSourceIsSerializableWithUnserializableElements() throws Exception {
+ List<UnserializableRecord> elements =
+ ImmutableList.of(
+ new UnserializableRecord("foo"),
+ new UnserializableRecord("bar"),
+ new UnserializableRecord("baz"));
+ CreateSource<UnserializableRecord> source =
+ CreateSource.fromIterable(elements, new UnserializableRecord.UnserializableRecordCoder());
+ SerializableUtils.ensureSerializable(source);
+ }
+
+ @Test
+ public void testSourceSplitIntoBundles() throws Exception {
+ CreateSource<Integer> source =
+ CreateSource.fromIterable(
+ ImmutableList.of(1, 2, 3, 4, 5, 6, 7, 8), BigEndianIntegerCoder.of());
+ PipelineOptions options = PipelineOptionsFactory.create();
+ List<? extends BoundedSource<Integer>> splitSources = source.splitIntoBundles(12, options);
+ assertThat(splitSources, hasSize(3));
+ SourceTestUtils.assertSourcesEqualReferenceSource(source, splitSources, options);
+ }
+
+ @Test
+ public void testSourceDoesNotProduceSortedKeys() throws Exception {
+ CreateSource<String> source =
+ CreateSource.fromIterable(ImmutableList.of("spam", "ham", "eggs"), StringUtf8Coder.of());
+ assertThat(source.producesSortedKeys(PipelineOptionsFactory.create()), is(false));
+ }
+
+ @Test
+ public void testSourceGetDefaultOutputCoderReturnsConstructorCoder() throws Exception {
+ Coder<Integer> coder = VarIntCoder.of();
+ CreateSource<Integer> source =
+ CreateSource.fromIterable(ImmutableList.of(1, 2, 3, 4, 5, 6, 7, 8), coder);
+
+ Coder<Integer> defaultCoder = source.getDefaultOutputCoder();
+ assertThat(defaultCoder, equalTo(coder));
+ }
+
+ @Test
+ public void testSourceSplitAtFraction() throws Exception {
+ List<Integer> elements = new ArrayList<>();
+ Random random = new Random();
+ for (int i = 0; i < 25; i++) {
+ elements.add(random.nextInt());
+ }
+ CreateSource<Integer> source = CreateSource.fromIterable(elements, VarIntCoder.of());
+
+ SourceTestUtils.assertSplitAtFractionExhaustive(source, PipelineOptionsFactory.create());
+ }
}