You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/04/22 03:21:29 UTC

[1/3] incubator-beam git commit: Factor common setup in EncodabilityEnforcementFactoryTest

Repository: incubator-beam
Updated Branches:
  refs/heads/master 640e2588f -> a5548f915


Factor common setup in EncodabilityEnforcementFactoryTest


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/59fd4b3c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/59fd4b3c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/59fd4b3c

Branch: refs/heads/master
Commit: 59fd4b3cd51ca5908dc4ca0f5249c4c59399e1cf
Parents: 61988f3
Author: Thomas Groh <tg...@google.com>
Authored: Thu Apr 21 16:08:20 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Apr 21 18:20:54 2016 -0700

----------------------------------------------------------------------
 .../EncodabilityEnforcementFactoryTest.java     | 43 ++++++++------------
 1 file changed, 17 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59fd4b3c/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java
index cade02b..8ed2684 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.runners.inprocess;
 import static org.hamcrest.Matchers.isA;
 
 import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
@@ -54,16 +55,9 @@ public class EncodabilityEnforcementFactoryTest {
 
   @Test
   public void encodeFailsThrows() {
-    TestPipeline p = TestPipeline.create();
-    PCollection<Record> unencodable =
-        p.apply(Create.<Record>of().withCoder(new RecordNoEncodeCoder()));
-    AppliedPTransform<?, ?, ?> consumer =
-        unencodable.apply(Count.<Record>globally()).getProducingTransformInternal();
-
     WindowedValue<Record> record = WindowedValue.valueInGlobalWindow(new Record());
-    CommittedBundle<Record> input =
-        bundleFactory.createRootBundle(unencodable).add(record).commit(Instant.now());
-    ModelEnforcement<Record> enforcement = factory.forBundle(input, consumer);
+
+    ModelEnforcement<Record> enforcement = createEnforcement(new RecordNoEncodeCoder(), record);
 
     thrown.expect(UserCodeException.class);
     thrown.expectCause(isA(CoderException.class));
@@ -73,16 +67,9 @@ public class EncodabilityEnforcementFactoryTest {
 
   @Test
   public void decodeFailsThrows() {
-    TestPipeline p = TestPipeline.create();
-    PCollection<Record> unencodable =
-        p.apply(Create.of(new Record()).withCoder(new RecordNoDecodeCoder()));
-    AppliedPTransform<?, ?, ?> consumer =
-        unencodable.apply(Count.<Record>globally()).getProducingTransformInternal();
     WindowedValue<Record> record = WindowedValue.valueInGlobalWindow(new Record());
 
-    CommittedBundle<Record> input =
-        bundleFactory.createRootBundle(unencodable).add(record).commit(Instant.now());
-    ModelEnforcement<Record> enforcement = factory.forBundle(input, consumer);
+    ModelEnforcement<Record> enforcement = createEnforcement(new RecordNoDecodeCoder(), record);
 
     thrown.expect(UserCodeException.class);
     thrown.expectCause(isA(CoderException.class));
@@ -92,12 +79,6 @@ public class EncodabilityEnforcementFactoryTest {
 
   @Test
   public void consistentWithEqualsStructuralValueNotEqualThrows() {
-    TestPipeline p = TestPipeline.create();
-    PCollection<Record> unencodable =
-        p.apply(Create.of(new Record()).withCoder(new RecordStructuralValueCoder()));
-    AppliedPTransform<?, ?, ?> consumer =
-        unencodable.apply(Count.<Record>globally()).getProducingTransformInternal();
-
     WindowedValue<Record> record =
         WindowedValue.<Record>valueInGlobalWindow(
             new Record() {
@@ -107,9 +88,8 @@ public class EncodabilityEnforcementFactoryTest {
               }
             });
 
-    CommittedBundle<Record> input =
-        bundleFactory.createRootBundle(unencodable).add(record).commit(Instant.now());
-    ModelEnforcement<Record> enforcement = factory.forBundle(input, consumer);
+    ModelEnforcement<Record> enforcement =
+        createEnforcement(new RecordStructuralValueCoder(), record);
 
     thrown.expect(UserCodeException.class);
     thrown.expectCause(isA(IllegalArgumentException.class));
@@ -143,6 +123,17 @@ public class EncodabilityEnforcementFactoryTest {
         Collections.<CommittedBundle<?>>emptyList());
   }
 
+  private <T> ModelEnforcement<T> createEnforcement(Coder<T> coder, WindowedValue<T> record) {
+    TestPipeline p = TestPipeline.create();
+    PCollection<T> unencodable = p.apply(Create.<T>of().withCoder(coder));
+    AppliedPTransform<?, ?, ?> consumer =
+        unencodable.apply(Count.<T>globally()).getProducingTransformInternal();
+    CommittedBundle<T> input =
+        bundleFactory.createRootBundle(unencodable).add(record).commit(Instant.now());
+    ModelEnforcement<T> enforcement = factory.forBundle(input, consumer);
+    return enforcement;
+  }
+
   @Test
   public void structurallyEqualResultsSucceeds() {
     TestPipeline p = TestPipeline.create();


[2/3] incubator-beam git commit: Implement Create as An OffsetBasedSource

Posted by dh...@apache.org.
Implement Create as An OffsetBasedSource

This removes the requirement to implement a primtiive Create for runners
that support Reads from a Bounded Source.

Remove Dataflow Runner references to Create.Values


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/61988f34
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/61988f34
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/61988f34

Branch: refs/heads/master
Commit: 61988f34dc4481bb91efcced7a37abf15b420ffa
Parents: 640e258
Author: Thomas Groh <tg...@google.com>
Authored: Thu Apr 14 15:22:38 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Apr 21 18:20:54 2016 -0700

----------------------------------------------------------------------
 .../sdk/runners/DataflowPipelineTranslator.java |  42 ----
 .../sdk/runners/DataflowPipelineRunnerTest.java |  12 +-
 .../runners/DataflowPipelineTranslatorTest.java |   4 +-
 .../sdk/runners/inprocess/InProcessCreate.java  | 236 -------------------
 .../inprocess/InProcessPipelineRunner.java      |   3 -
 .../org/apache/beam/sdk/transforms/Create.java  | 198 ++++++++++++----
 .../beam/sdk/runners/TransformTreeTest.java     |   3 +-
 .../EncodabilityEnforcementFactoryTest.java     |   2 +-
 .../runners/inprocess/InProcessCreateTest.java  | 236 -------------------
 .../apache/beam/sdk/transforms/CreateTest.java  | 134 +++++++++++
 10 files changed, 299 insertions(+), 571 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/61988f34/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipelineTranslator.java
index 4e60545..5c0745f 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipelineTranslator.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.runners;
 
-import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray;
 import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray;
 import static org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString;
 import static org.apache.beam.sdk.util.StringUtils.jsonStringToByteArray;
@@ -34,7 +33,6 @@ import static com.google.common.base.Preconditions.checkArgument;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.io.BigQueryIO;
 import org.apache.beam.sdk.io.PubsubIO;
@@ -47,7 +45,6 @@ import org.apache.beam.sdk.runners.dataflow.PubsubIOTranslator;
 import org.apache.beam.sdk.runners.dataflow.ReadTranslator;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
@@ -845,45 +842,6 @@ public class DataflowPipelineTranslator {
         });
 
     registerTransformTranslator(
-        Create.Values.class,
-        new TransformTranslator<Create.Values>() {
-          @Override
-          public void translate(
-              Create.Values transform,
-              TranslationContext context) {
-            createHelper(transform, context);
-          }
-
-          private <T> void createHelper(
-              Create.Values<T> transform,
-              TranslationContext context) {
-            context.addStep(transform, "CreateCollection");
-
-            Coder<T> coder = context.getOutput(transform).getCoder();
-            List<CloudObject> elements = new LinkedList<>();
-            for (T elem : transform.getElements()) {
-              byte[] encodedBytes;
-              try {
-                encodedBytes = encodeToByteArray(coder, elem);
-              } catch (CoderException exn) {
-                // TODO: Put in better element printing:
-                // truncate if too long.
-                throw new IllegalArgumentException(
-                    "Unable to encode element '" + elem + "' of transform '" + transform
-                    + "' using coder '" + coder + "'.",
-                    exn);
-              }
-              String encodedJson = byteArrayToJsonString(encodedBytes);
-              assert Arrays.equals(encodedBytes,
-                                   jsonStringToByteArray(encodedJson));
-              elements.add(CloudObject.forString(encodedJson));
-            }
-            context.addInput(PropertyNames.ELEMENT, elements);
-            context.addValueOnlyOutput(PropertyNames.OUTPUT, context.getOutput(transform));
-          }
-        });
-
-    registerTransformTranslator(
         Flatten.FlattenPCollectionList.class,
         new TransformTranslator<Flatten.FlattenPCollectionList>() {
           @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/61988f34/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineRunnerTest.java
index 8b024fb..6949128 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineRunnerTest.java
@@ -21,6 +21,7 @@ import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow;
 
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.startsWith;
 import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
@@ -840,9 +841,16 @@ public class DataflowPipelineRunnerTest {
     CompositeTransformRecorder recorder = new CompositeTransformRecorder();
     p.traverseTopologically(recorder);
 
-    assertThat("Expected to have seen CreateTimestamped composite transform.",
+    // The recorder will also have seen a Create.Values composite as well, but we can't obtain that
+    // transform.
+    assertThat(
+        "Expected to have seen CreateTimestamped composite transform.",
         recorder.getCompositeTransforms(),
-        Matchers.<PTransform<?, ?>>contains(transform));
+        hasItem(transform));
+    assertThat(
+        "Expected to have two composites, CreateTimestamped and Create.Values",
+        recorder.getCompositeTransforms(),
+        hasItem(Matchers.<PTransform<?, ?>>isA((Class) Create.Values.class)));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/61988f34/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java
index 0d58601..a62f550 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java
@@ -751,7 +751,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     assertEquals(2, steps.size());
 
     Step createStep = steps.get(0);
-    assertEquals("CreateCollection", createStep.getKind());
+    assertEquals("ParallelRead", createStep.getKind());
 
     Step collectionToSingletonStep = steps.get(1);
     assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
@@ -783,7 +783,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     assertEquals(2, steps.size());
 
     Step createStep = steps.get(0);
-    assertEquals("CreateCollection", createStep.getKind());
+    assertEquals("ParallelRead", createStep.getKind());
 
     Step collectionToSingletonStep = steps.get(1);
     assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/61988f34/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessCreate.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessCreate.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessCreate.java
deleted file mode 100644
index c29d5ce..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessCreate.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.OffsetBasedSource;
-import org.apache.beam.sdk.io.OffsetBasedSource.OffsetBasedReader;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.Create.Values;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-import javax.annotation.Nullable;
-
-/**
- * An in-process implementation of the {@link Values Create.Values} {@link PTransform}, implemented
- * using a {@link BoundedSource}.
- *
- * The coder is inferred via the {@link Values#getDefaultOutputCoder(PInput)} method on the original
- * transform.
- */
-class InProcessCreate<T> extends ForwardingPTransform<PInput, PCollection<T>> {
-  private final Create.Values<T> original;
-
-  /**
-   * A {@link PTransformOverrideFactory} for {@link InProcessCreate}.
-   */
-  public static class InProcessCreateOverrideFactory implements PTransformOverrideFactory {
-    @Override
-    public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override(
-        PTransform<InputT, OutputT> transform) {
-      if (transform instanceof Create.Values) {
-        @SuppressWarnings("unchecked")
-        PTransform<InputT, OutputT> override =
-            (PTransform<InputT, OutputT>) from((Create.Values<OutputT>) transform);
-        return override;
-      }
-      return transform;
-    }
-  }
-
-  public static <T> InProcessCreate<T> from(Create.Values<T> original) {
-    return new InProcessCreate<>(original);
-  }
-
-  private InProcessCreate(Values<T> original) {
-    this.original = original;
-  }
-
-  @Override
-  public PCollection<T> apply(PInput input) {
-    Coder<T> elementCoder;
-    try {
-      elementCoder = original.getDefaultOutputCoder(input);
-    } catch (CannotProvideCoderException e) {
-      throw new IllegalArgumentException(
-          "Unable to infer a coder and no Coder was specified. "
-          + "Please set a coder by invoking Create.withCoder() explicitly.",
-          e);
-    }
-    InMemorySource<T> source;
-    try {
-      source = InMemorySource.fromIterable(original.getElements(), elementCoder);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-    PCollection<T> result = input.getPipeline().apply(Read.from(source));
-    result.setCoder(elementCoder);
-    return result;
-  }
-
-  @Override
-  public PTransform<PInput, PCollection<T>> delegate() {
-    return original;
-  }
-
-  @VisibleForTesting
-  static class InMemorySource<T> extends OffsetBasedSource<T> {
-    private final List<byte[]> allElementsBytes;
-    private final long totalSize;
-    private final Coder<T> coder;
-
-    public static <T> InMemorySource<T> fromIterable(Iterable<T> elements, Coder<T> elemCoder)
-        throws CoderException, IOException {
-      ImmutableList.Builder<byte[]> allElementsBytes = ImmutableList.builder();
-      long totalSize = 0L;
-      for (T element : elements) {
-        byte[] bytes = CoderUtils.encodeToByteArray(elemCoder, element);
-        allElementsBytes.add(bytes);
-        totalSize += bytes.length;
-      }
-      return new InMemorySource<>(allElementsBytes.build(), totalSize, elemCoder);
-    }
-
-    /**
-     * Create a new source with the specified bytes. The new source owns the input element bytes,
-     * which must not be modified after this constructor is called.
-     */
-    private InMemorySource(List<byte[]> elementBytes, long totalSize, Coder<T> coder) {
-      super(0, elementBytes.size(), 1);
-      this.allElementsBytes = ImmutableList.copyOf(elementBytes);
-      this.totalSize = totalSize;
-      this.coder = coder;
-    }
-
-    @Override
-    public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
-      return totalSize;
-    }
-
-    @Override
-    public boolean producesSortedKeys(PipelineOptions options) throws Exception {
-      return false;
-    }
-
-    @Override
-    public BoundedSource.BoundedReader<T> createReader(PipelineOptions options) throws IOException {
-      return new BytesReader<>(this);
-    }
-
-    @Override
-    public void validate() {}
-
-    @Override
-    public Coder<T> getDefaultOutputCoder() {
-      return coder;
-    }
-
-    @Override
-    public long getMaxEndOffset(PipelineOptions options) throws Exception {
-      return allElementsBytes.size();
-    }
-
-    @Override
-    public OffsetBasedSource<T> createSourceForSubrange(long start, long end) {
-      List<byte[]> primaryElems = allElementsBytes.subList((int) start, (int) end);
-      long primarySizeEstimate =
-          (long) (totalSize * primaryElems.size() / (double) allElementsBytes.size());
-      return new InMemorySource<>(primaryElems, primarySizeEstimate, coder);
-    }
-
-    @Override
-    public long getBytesPerOffset() {
-      if (allElementsBytes.size() == 0) {
-        return 0L;
-      }
-      return totalSize / allElementsBytes.size();
-    }
-  }
-
-  private static class BytesReader<T> extends OffsetBasedReader<T> {
-    private int index;
-    /**
-     * Use an optional to distinguish between null next element (as Optional.absent()) and no next
-     * element (next is null).
-     */
-    @Nullable private Optional<T> next;
-
-    public BytesReader(InMemorySource<T> source) {
-      super(source);
-      index = -1;
-    }
-
-    @Override
-    @Nullable
-    public T getCurrent() throws NoSuchElementException {
-      if (next == null) {
-        throw new NoSuchElementException();
-      }
-      return next.orNull();
-    }
-
-    @Override
-    public void close() throws IOException {}
-
-    @Override
-    protected long getCurrentOffset() {
-      return index;
-    }
-
-    @Override
-    protected boolean startImpl() throws IOException {
-      return advanceImpl();
-    }
-
-    @Override
-    public synchronized InMemorySource<T> getCurrentSource() {
-      return (InMemorySource<T>) super.getCurrentSource();
-    }
-
-    @Override
-    protected boolean advanceImpl() throws IOException {
-      InMemorySource<T> source = getCurrentSource();
-      index++;
-      if (index >= source.allElementsBytes.size()) {
-        return false;
-      }
-      next =
-          Optional.fromNullable(
-              CoderUtils.decodeFromByteArray(
-                  source.coder, source.allElementsBytes.get(index)));
-      return true;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/61988f34/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRunner.java
index 6cc35fb..7c28238 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRunner.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRunner.java
@@ -30,11 +30,9 @@ import org.apache.beam.sdk.runners.AggregatorValues;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.runners.inprocess.GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly;
 import org.apache.beam.sdk.runners.inprocess.GroupByKeyEvaluatorFactory.InProcessGroupByKeyOverrideFactory;
-import org.apache.beam.sdk.runners.inprocess.InProcessCreate.InProcessCreateOverrideFactory;
 import org.apache.beam.sdk.runners.inprocess.ViewEvaluatorFactory.InProcessViewOverrideFactory;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -83,7 +81,6 @@ public class InProcessPipelineRunner
   private static Map<Class<? extends PTransform>, PTransformOverrideFactory>
       defaultTransformOverrides =
           ImmutableMap.<Class<? extends PTransform>, PTransformOverrideFactory>builder()
-              .put(Create.Values.class, new InProcessCreateOverrideFactory())
               .put(GroupByKey.class, new InProcessGroupByKeyOverrideFactory())
               .put(CreatePCollectionView.class, new InProcessViewOverrideFactory())
               .put(AvroIO.Write.Bound.class, new AvroIOShardedWriteFactory())

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/61988f34/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
index 27fb39d..1bd4fb3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
@@ -20,33 +20,42 @@ package org.apache.beam.sdk.transforms;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.runners.DirectPipelineRunner;
-import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.OffsetBasedSource;
+import org.apache.beam.sdk.io.OffsetBasedSource.OffsetBasedReader;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder;
 import org.apache.beam.sdk.values.TypeDescriptor;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 
 import org.joda.time.Instant;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Objects;
 
+import javax.annotation.Nullable;
+
 /**
  * {@code Create<T>} takes a collection of elements of type {@code T}
  * known when the pipeline is constructed and returns a
@@ -237,12 +246,13 @@ public class Create<T> {
     public PCollection<T> apply(PInput input) {
       try {
         Coder<T> coder = getDefaultOutputCoder(input);
-        return PCollection
-            .<T>createPrimitiveOutputInternal(
-                input.getPipeline(),
-                WindowingStrategy.globalDefault(),
-                IsBounded.BOUNDED)
-            .setCoder(coder);
+        try {
+          CreateSource<T> source = CreateSource.fromIterable(elems, coder);
+          return input.getPipeline().apply(Read.from(source));
+        } catch (IOException e) {
+          throw new RuntimeException(
+              String.format("Unable to apply Create %s using Coder %s.", this, coder), e);
+        }
       } catch (CannotProvideCoderException e) {
         throw new IllegalArgumentException("Unable to infer a coder and no Coder was specified. "
             + "Please set a coder by invoking Create.withCoder() explicitly.", e);
@@ -320,6 +330,136 @@ public class Create<T> {
       this.elems = elems;
       this.coder = coder;
     }
+
+    @VisibleForTesting
+    static class CreateSource<T> extends OffsetBasedSource<T> {
+      private final List<byte[]> allElementsBytes;
+      private final long totalSize;
+      private final Coder<T> coder;
+
+      public static <T> CreateSource<T> fromIterable(Iterable<T> elements, Coder<T> elemCoder)
+          throws CoderException, IOException {
+        ImmutableList.Builder<byte[]> allElementsBytes = ImmutableList.builder();
+        long totalSize = 0L;
+        for (T element : elements) {
+          byte[] bytes = CoderUtils.encodeToByteArray(elemCoder, element);
+          allElementsBytes.add(bytes);
+          totalSize += bytes.length;
+        }
+        return new CreateSource<>(allElementsBytes.build(), totalSize, elemCoder);
+      }
+
+      /**
+       * Create a new source with the specified bytes. The new source owns the input element bytes,
+       * which must not be modified after this constructor is called.
+       */
+      private CreateSource(List<byte[]> elementBytes, long totalSize, Coder<T> coder) {
+        super(0, elementBytes.size(), 1);
+        this.allElementsBytes = ImmutableList.copyOf(elementBytes);
+        this.totalSize = totalSize;
+        this.coder = coder;
+      }
+
+      @Override
+      public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+        return totalSize;
+      }
+
+      @Override
+      public boolean producesSortedKeys(PipelineOptions options) throws Exception {
+        return false;
+      }
+
+      @Override
+      public BoundedSource.BoundedReader<T> createReader(PipelineOptions options)
+          throws IOException {
+        return new BytesReader<>(this);
+      }
+
+      @Override
+      public void validate() {}
+
+      @Override
+      public Coder<T> getDefaultOutputCoder() {
+        return coder;
+      }
+
+      @Override
+      public long getMaxEndOffset(PipelineOptions options) throws Exception {
+        return allElementsBytes.size();
+      }
+
+      @Override
+      public OffsetBasedSource<T> createSourceForSubrange(long start, long end) {
+        List<byte[]> primaryElems = allElementsBytes.subList((int) start, (int) end);
+        long primarySizeEstimate =
+            (long) (totalSize * primaryElems.size() / (double) allElementsBytes.size());
+        return new CreateSource<>(primaryElems, primarySizeEstimate, coder);
+      }
+
+      @Override
+      public long getBytesPerOffset() {
+        if (allElementsBytes.size() == 0) {
+          return 0L;
+        }
+        return totalSize / allElementsBytes.size();
+      }
+    }
+
+    private static class BytesReader<T> extends OffsetBasedReader<T> {
+      private int index;
+      /**
+       * Use an optional to distinguish between null next element (as Optional.absent()) and no next
+       * element (next is null).
+       */
+      @Nullable private Optional<T> next;
+
+      public BytesReader(CreateSource<T> source) {
+        super(source);
+        index = -1;
+      }
+
+      @Override
+      @Nullable
+      public T getCurrent() throws NoSuchElementException {
+        if (next == null) {
+          throw new NoSuchElementException();
+        }
+        return next.orNull();
+      }
+
+      @Override
+      public void close() throws IOException {}
+
+      @Override
+      protected long getCurrentOffset() {
+        return index;
+      }
+
+      @Override
+      protected boolean startImpl() throws IOException {
+        return advanceImpl();
+      }
+
+      @Override
+      public synchronized CreateSource<T> getCurrentSource() {
+        return (CreateSource<T>) super.getCurrentSource();
+      }
+
+      @Override
+      protected boolean advanceImpl() throws IOException {
+        CreateSource<T> source = getCurrentSource();
+        index++;
+        if (index >= source.allElementsBytes.size()) {
+          next = null;
+          return false;
+        }
+        next =
+            Optional.fromNullable(
+                CoderUtils.decodeFromByteArray(source.coder, source.allElementsBytes.get(index)));
+        return true;
+      }
+    }
   }
 
   /////////////////////////////////////////////////////////////////////////////
@@ -387,42 +527,4 @@ public class Create<T> {
       }
     }
   }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  static {
-    registerDefaultTransformEvaluator();
-  }
-
-  @SuppressWarnings({"rawtypes", "unchecked"})
-  private static void registerDefaultTransformEvaluator() {
-    DirectPipelineRunner.registerDefaultTransformEvaluator(
-        Create.Values.class,
-        new DirectPipelineRunner.TransformEvaluator<Create.Values>() {
-          @Override
-          public void evaluate(
-              Create.Values transform,
-              DirectPipelineRunner.EvaluationContext context) {
-            evaluateHelper(transform, context);
-          }
-        });
-  }
-
-  private static <T> void evaluateHelper(
-      Create.Values<T> transform,
-      DirectPipelineRunner.EvaluationContext context) {
-    // Convert the Iterable of elems into a List of elems.
-    List<T> listElems;
-    if (transform.elems instanceof Collection) {
-      Collection<T> collectionElems = (Collection<T>) transform.elems;
-      listElems = new ArrayList<>(collectionElems.size());
-    } else {
-      listElems = new ArrayList<>();
-    }
-    for (T elem : transform.elems) {
-      listElems.add(
-          context.ensureElementEncodable(context.getOutput(transform), elem));
-    }
-    context.setPCollection(context.getOutput(transform), listElems);
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/61988f34/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
index 7690d2b..e4eb204 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
@@ -158,7 +158,8 @@ public class TransformTreeTest {
         // Pick is a composite, should not be visited here.
         assertThat(transform, not(instanceOf(Sample.SampleAny.class)));
         assertThat(transform, not(instanceOf(Write.Bound.class)));
-        if (transform instanceof Read.Bounded) {
+        if (transform instanceof Read.Bounded
+            && node.getEnclosingNode().getTransform() instanceof TextIO.Read.Bound) {
           assertTrue(visited.add(TransformsSeen.READ));
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/61988f34/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java
index 85c4322..cade02b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java
@@ -56,7 +56,7 @@ public class EncodabilityEnforcementFactoryTest {
   public void encodeFailsThrows() {
     TestPipeline p = TestPipeline.create();
     PCollection<Record> unencodable =
-        p.apply(Create.of(new Record()).withCoder(new RecordNoEncodeCoder()));
+        p.apply(Create.<Record>of().withCoder(new RecordNoEncodeCoder()));
     AppliedPTransform<?, ?, ?> consumer =
         unencodable.apply(Count.<Record>globally()).getProducingTransformInternal();
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/61988f34/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessCreateTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessCreateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessCreateTest.java
deleted file mode 100644
index 5c63af1..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessCreateTest.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.hasSize;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.NullableCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.runners.inprocess.InProcessCreate.InMemorySource;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
-import org.apache.beam.sdk.testing.SourceTestUtils;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.util.SerializableUtils;
-import org.apache.beam.sdk.values.PCollection;
-
-import com.google.common.collect.ImmutableList;
-
-import org.hamcrest.Matchers;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-
-/**
- * Tests for {@link InProcessCreate}.
- */
-@RunWith(JUnit4.class)
-public class InProcessCreateTest {
-  @Rule
-  public ExpectedException thrown = ExpectedException.none();
-
-  @Test
-  @Category(RunnableOnService.class)
-  public void testConvertsCreate() {
-    TestPipeline p = TestPipeline.create();
-    Create.Values<Integer> og = Create.of(1, 2, 3);
-
-    InProcessCreate<Integer> converted = InProcessCreate.from(og);
-
-    PAssert.that(p.apply(converted)).containsInAnyOrder(2, 1, 3);
-
-    p.run();
-  }
-
-  @Test
-  @Category(RunnableOnService.class)
-  public void testConvertsCreateWithNullElements() {
-    Create.Values<String> og =
-        Create.<String>of("foo", null, "spam", "ham", null, "eggs")
-            .withCoder(NullableCoder.of(StringUtf8Coder.of()));
-
-    InProcessCreate<String> converted = InProcessCreate.from(og);
-    TestPipeline p = TestPipeline.create();
-
-    PAssert.that(p.apply(converted))
-        .containsInAnyOrder(null, "foo", null, "spam", "ham", "eggs");
-
-    p.run();
-  }
-
-  static class Record implements Serializable {}
-
-  static class Record2 extends Record {}
-
-  @Test
-  public void testThrowsIllegalArgumentWhenCannotInferCoder() {
-    Create.Values<Record> og = Create.of(new Record(), new Record2());
-    InProcessCreate<Record> converted = InProcessCreate.from(og);
-
-    Pipeline p = TestPipeline.create();
-
-    // Create won't infer a default coder in this case.
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage(Matchers.containsString("Unable to infer a coder"));
-
-    PCollection<Record> c = p.apply(converted);
-    p.run();
-
-    fail("Unexpectedly Inferred Coder " + c.getCoder());
-  }
-
-  /**
-   * An unserializable class to demonstrate encoding of elements.
-   */
-  private static class UnserializableRecord {
-    private final String myString;
-
-    private UnserializableRecord(String myString) {
-      this.myString = myString;
-    }
-
-    @Override
-    public int hashCode() {
-      return myString.hashCode();
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      return myString.equals(((UnserializableRecord) o).myString);
-    }
-
-    static class UnserializableRecordCoder extends AtomicCoder<UnserializableRecord> {
-      private final Coder<String> stringCoder = StringUtf8Coder.of();
-
-      @Override
-      public void encode(
-          UnserializableRecord value,
-          OutputStream outStream,
-          org.apache.beam.sdk.coders.Coder.Context context)
-          throws CoderException, IOException {
-        stringCoder.encode(value.myString, outStream, context.nested());
-      }
-
-      @Override
-      public UnserializableRecord decode(
-          InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context)
-          throws CoderException, IOException {
-        return new UnserializableRecord(stringCoder.decode(inStream, context.nested()));
-      }
-    }
-  }
-
-  @Test
-  @Category(RunnableOnService.class)
-  public void testConvertsUnserializableElements() throws Exception {
-    List<UnserializableRecord> elements =
-        ImmutableList.of(
-            new UnserializableRecord("foo"),
-            new UnserializableRecord("bar"),
-            new UnserializableRecord("baz"));
-    InProcessCreate<UnserializableRecord> create =
-        InProcessCreate.from(
-            Create.of(elements).withCoder(new UnserializableRecord.UnserializableRecordCoder()));
-
-    TestPipeline p = TestPipeline.create();
-    PAssert.that(p.apply(create))
-        .containsInAnyOrder(
-            new UnserializableRecord("foo"),
-            new UnserializableRecord("bar"),
-            new UnserializableRecord("baz"));
-    p.run();
-  }
-
-  @Test
-  public void testSerializableOnUnserializableElements() throws Exception {
-    List<UnserializableRecord> elements =
-        ImmutableList.of(
-            new UnserializableRecord("foo"),
-            new UnserializableRecord("bar"),
-            new UnserializableRecord("baz"));
-    InMemorySource<UnserializableRecord> source =
-        InMemorySource.fromIterable(elements, new UnserializableRecord.UnserializableRecordCoder());
-    SerializableUtils.ensureSerializable(source);
-  }
-
-  @Test
-  public void testSplitIntoBundles() throws Exception {
-    InProcessCreate.InMemorySource<Integer> source =
-        InMemorySource.fromIterable(
-            ImmutableList.of(1, 2, 3, 4, 5, 6, 7, 8), BigEndianIntegerCoder.of());
-    PipelineOptions options = PipelineOptionsFactory.create();
-    List<? extends BoundedSource<Integer>> splitSources = source.splitIntoBundles(12, options);
-    assertThat(splitSources, hasSize(3));
-    SourceTestUtils.assertSourcesEqualReferenceSource(source, splitSources, options);
-  }
-
-  @Test
-  public void testDoesNotProduceSortedKeys() throws Exception {
-    InProcessCreate.InMemorySource<String> source =
-        InMemorySource.fromIterable(ImmutableList.of("spam", "ham", "eggs"), StringUtf8Coder.of());
-    assertThat(source.producesSortedKeys(PipelineOptionsFactory.create()), is(false));
-  }
-
-  @Test
-  public void testGetDefaultOutputCoderReturnsConstructorCoder() throws Exception {
-    Coder<Integer> coder = VarIntCoder.of();
-    InProcessCreate.InMemorySource<Integer> source =
-        InMemorySource.fromIterable(ImmutableList.of(1, 2, 3, 4, 5, 6, 7, 8), coder);
-
-    Coder<Integer> defaultCoder = source.getDefaultOutputCoder();
-    assertThat(defaultCoder, equalTo(coder));
-  }
-
-  @Test
-  public void testSplitAtFraction() throws Exception {
-    List<Integer> elements = new ArrayList<>();
-    Random random = new Random();
-    for (int i = 0; i < 25; i++) {
-      elements.add(random.nextInt());
-    }
-    InProcessCreate.InMemorySource<Integer> source =
-        InMemorySource.fromIterable(elements, VarIntCoder.of());
-
-    SourceTestUtils.assertSplitAtFractionExhaustive(source, PipelineOptionsFactory.create());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/61988f34/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
index 393fede..2998489 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
@@ -22,19 +22,36 @@ import static org.apache.beam.sdk.TestUtils.LINES_ARRAY;
 import static org.apache.beam.sdk.TestUtils.NO_LINES;
 import static org.apache.beam.sdk.TestUtils.NO_LINES_ARRAY;
 
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
+import org.apache.beam.sdk.testing.SourceTestUtils;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create.Values.CreateSource;
+import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TimestampedValue;
 
+import com.google.common.collect.ImmutableList;
+
 import org.hamcrest.Matchers;
 import org.joda.time.Instant;
 import org.junit.Rule;
@@ -44,11 +61,15 @@ import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Random;
 
 /**
  * Tests for Create.
@@ -142,6 +163,67 @@ public class CreateTest {
             TimestampedValue.of("a", new Instant(0)),
             TimestampedValue.of("b", new Instant(0)));
   }
+  /**
+   * An unserializable class to demonstrate encoding of elements.
+   */
+  private static class UnserializableRecord {
+    private final String myString;
+
+    private UnserializableRecord(String myString) {
+      this.myString = myString;
+    }
+
+    @Override
+    public int hashCode() {
+      return myString.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      return myString.equals(((UnserializableRecord) o).myString);
+    }
+
+    static class UnserializableRecordCoder extends AtomicCoder<UnserializableRecord> {
+      private final Coder<String> stringCoder = StringUtf8Coder.of();
+
+      @Override
+      public void encode(
+          UnserializableRecord value,
+          OutputStream outStream,
+          org.apache.beam.sdk.coders.Coder.Context context)
+          throws CoderException, IOException {
+        stringCoder.encode(value.myString, outStream, context.nested());
+      }
+
+      @Override
+      public UnserializableRecord decode(
+          InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context)
+          throws CoderException, IOException {
+        return new UnserializableRecord(stringCoder.decode(inStream, context.nested()));
+      }
+    }
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testCreateWithUnserializableElements() throws Exception {
+    List<UnserializableRecord> elements =
+        ImmutableList.of(
+            new UnserializableRecord("foo"),
+            new UnserializableRecord("bar"),
+            new UnserializableRecord("baz"));
+    Create.Values<UnserializableRecord> create =
+        Create.of(elements).withCoder(new UnserializableRecord.UnserializableRecordCoder());
+
+    TestPipeline p = TestPipeline.create();
+    PAssert.that(p.apply(create))
+        .containsInAnyOrder(
+            new UnserializableRecord("foo"),
+            new UnserializableRecord("bar"),
+            new UnserializableRecord("baz"));
+    p.run();
+  }
+
   private static class PrintTimestamps extends DoFn<String, String> {
     @Override
       public void processElement(ProcessContext c) {
@@ -239,4 +321,56 @@ public class CreateTest {
     assertEquals("Create.Values", Create.of(1, 2, 3).getName());
     assertEquals("Create.TimestampedValues", Create.timestamped(Collections.EMPTY_LIST).getName());
   }
+
+  @Test
+  public void testSourceIsSerializableWithUnserializableElements() throws Exception {
+    List<UnserializableRecord> elements =
+        ImmutableList.of(
+            new UnserializableRecord("foo"),
+            new UnserializableRecord("bar"),
+            new UnserializableRecord("baz"));
+    CreateSource<UnserializableRecord> source =
+        CreateSource.fromIterable(elements, new UnserializableRecord.UnserializableRecordCoder());
+    SerializableUtils.ensureSerializable(source);
+  }
+
+  @Test
+  public void testSourceSplitIntoBundles() throws Exception {
+    CreateSource<Integer> source =
+        CreateSource.fromIterable(
+            ImmutableList.of(1, 2, 3, 4, 5, 6, 7, 8), BigEndianIntegerCoder.of());
+    PipelineOptions options = PipelineOptionsFactory.create();
+    List<? extends BoundedSource<Integer>> splitSources = source.splitIntoBundles(12, options);
+    assertThat(splitSources, hasSize(3));
+    SourceTestUtils.assertSourcesEqualReferenceSource(source, splitSources, options);
+  }
+
+  @Test
+  public void testSourceDoesNotProduceSortedKeys() throws Exception {
+    CreateSource<String> source =
+        CreateSource.fromIterable(ImmutableList.of("spam", "ham", "eggs"), StringUtf8Coder.of());
+    assertThat(source.producesSortedKeys(PipelineOptionsFactory.create()), is(false));
+  }
+
+  @Test
+  public void testSourceGetDefaultOutputCoderReturnsConstructorCoder() throws Exception {
+    Coder<Integer> coder = VarIntCoder.of();
+    CreateSource<Integer> source =
+        CreateSource.fromIterable(ImmutableList.of(1, 2, 3, 4, 5, 6, 7, 8), coder);
+
+    Coder<Integer> defaultCoder = source.getDefaultOutputCoder();
+    assertThat(defaultCoder, equalTo(coder));
+  }
+
+  @Test
+  public void testSourceSplitAtFraction() throws Exception {
+    List<Integer> elements = new ArrayList<>();
+    Random random = new Random();
+    for (int i = 0; i < 25; i++) {
+      elements.add(random.nextInt());
+    }
+    CreateSource<Integer> source = CreateSource.fromIterable(elements, VarIntCoder.of());
+
+    SourceTestUtils.assertSplitAtFractionExhaustive(source, PipelineOptionsFactory.create());
+  }
 }



[3/3] incubator-beam git commit: Closes #183

Posted by dh...@apache.org.
Closes #183


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a5548f91
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a5548f91
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a5548f91

Branch: refs/heads/master
Commit: a5548f915df1449ab4e2d7841ef6d696ae28cd5c
Parents: 640e258 59fd4b3
Author: Dan Halperin <dh...@google.com>
Authored: Thu Apr 21 18:20:55 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Apr 21 18:20:55 2016 -0700

----------------------------------------------------------------------
 .../sdk/runners/DataflowPipelineTranslator.java |  42 ----
 .../sdk/runners/DataflowPipelineRunnerTest.java |  12 +-
 .../runners/DataflowPipelineTranslatorTest.java |   4 +-
 .../sdk/runners/inprocess/InProcessCreate.java  | 236 -------------------
 .../inprocess/InProcessPipelineRunner.java      |   3 -
 .../org/apache/beam/sdk/transforms/Create.java  | 198 ++++++++++++----
 .../beam/sdk/runners/TransformTreeTest.java     |   3 +-
 .../EncodabilityEnforcementFactoryTest.java     |  43 ++--
 .../runners/inprocess/InProcessCreateTest.java  | 236 -------------------
 .../apache/beam/sdk/transforms/CreateTest.java  | 134 +++++++++++
 10 files changed, 315 insertions(+), 596 deletions(-)
----------------------------------------------------------------------