You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2018/07/15 14:56:39 UTC

[beam] branch master updated: [BEAM-4432] Adding Sources to produce Synthetic output for Batch pipelines (#5519)

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b5e8335  [BEAM-4432] Adding Sources to produce Synthetic output for Batch pipelines (#5519)
b5e8335 is described below

commit b5e8335d982ee69d9f788f65f27356cddd5293d1
Author: Pablo <pa...@users.noreply.github.com>
AuthorDate: Sun Jul 15 07:56:33 2018 -0700

    [BEAM-4432] Adding Sources to produce Synthetic output for Batch pipelines (#5519)
    
    * Adding Sources to produce Synthetic output for Batch pipelines
---
 .../org/apache/beam/gradle/BeamModulePlugin.groovy |   1 +
 sdks/java/io/synthetic/build.gradle                |  36 ++
 .../beam/sdk/io/synthetic/SyntheticBoundedIO.java  | 437 ++++++++++++++++++
 .../beam/sdk/io/synthetic/SyntheticOptions.java    | 355 +++++++++++++++
 .../beam/sdk/io/synthetic/SyntheticUtils.java      | 100 ++++
 .../apache/beam/sdk/io/synthetic/package-info.java |  19 +
 .../sdk/io/synthetic/SyntheticBoundedIOTest.java   | 215 +++++++++
 .../apache_beam/testing/synthetic_pipeline.py      | 502 +++++++++++++++++++++
 .../apache_beam/testing/synthetic_pipeline_test.py | 154 +++++++
 sdks/python/scripts/generate_pydoc.sh              |   1 +
 sdks/python/setup.py                               |   7 +-
 settings.gradle                                    |   2 +
 12 files changed, 1828 insertions(+), 1 deletion(-)

diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index da0a875..c92f23c 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -341,6 +341,7 @@ class BeamModulePlugin implements Plugin<Project> {
         commons_io_1x                               : "commons-io:commons-io:1.3.2",
         commons_io_2x                               : "commons-io:commons-io:2.5",
         commons_lang3                               : "org.apache.commons:commons-lang3:3.6",
+        commons_math3                               : "org.apache.commons:commons-math3:3.6.1",
         datastore_v1_proto_client                   : "com.google.cloud.datastore:datastore-v1-proto-client:1.4.0",
         datastore_v1_protos                         : "com.google.cloud.datastore:datastore-v1-protos:1.3.0",
         error_prone_annotations                     : "com.google.errorprone:error_prone_annotations:2.0.15",
diff --git a/sdks/java/io/synthetic/build.gradle b/sdks/java/io/synthetic/build.gradle
new file mode 100644
index 0000000..3d6be8e
--- /dev/null
+++ b/sdks/java/io/synthetic/build.gradle
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+apply plugin: org.apache.beam.gradle.BeamModulePlugin
+applyJavaNature()
+
+description = "Apache Beam :: SDKs :: Java :: IO :: Synthetic"
+ext.summary = "Generators of Synthetic IO for Testing."
+
+dependencies {
+  compile library.java.joda_time
+  compile library.java.commons_math3
+  shadow library.java.jackson_core
+  shadow library.java.jackson_annotations
+  shadow library.java.jackson_databind
+  testCompile library.java.guava
+  testCompile library.java.junit
+  testCompile library.java.hamcrest_core
+  testCompile library.java.hamcrest_library
+  shadow project(path: ":beam-sdks-java-core", configuration: "shadow")
+}
diff --git a/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticBoundedIO.java b/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticBoundedIO.java
new file mode 100644
index 0000000..d9f652a
--- /dev/null
+++ b/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticBoundedIO.java
@@ -0,0 +1,437 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.synthetic;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.commons.math3.stat.StatUtils.sum;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.google.common.base.MoreObjects;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Random;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.io.OffsetBasedSource;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.commons.math3.distribution.ConstantRealDistribution;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This {@link SyntheticBoundedIO} class provides a parameterizable batch custom source that is
+ * deterministic.
+ *
+ * <p>The {@link SyntheticBoundedSource} generates a {@link PCollection} of {@code KV<byte[],
+ * byte[]>}. A fraction of the generated records {@code KV<byte[], byte[]>} are associated with
+ * "hot" keys, which are uniformly distributed over a fixed number of hot keys. The remaining
+ * generated records are associated with "random" keys. Each record will be slowed down by a certain
+ * sleep time generated based on the specified sleep time distribution when the {@link
+ * SyntheticSourceReader} reads each record. The record {@code KV<byte[], byte[]>} is generated
+ * deterministically based on the record's position in the source, which enables repeatable
+ * execution for debugging. The SyntheticBoundedInput configurable parameters are defined in {@link
+ * SyntheticBoundedIO.SyntheticSourceOptions}.
+ *
+ * <p>To read a {@link PCollection} of {@code KV<byte[], byte[]>} from {@link SyntheticBoundedIO},
+ * use {@link SyntheticBoundedIO#readFrom} to construct the synthetic source with synthetic source
+ * options. See {@link SyntheticBoundedIO.SyntheticSourceOptions} for how to construct an instance.
+ * An example is below:
+ *
+ * <pre>{@code
+ * Pipeline p = ...;
+ * SyntheticBoundedInput.SourceOptions sso = ...;
+ *
+ * // Construct the synthetic input with synthetic source options.
+ * PCollection<KV<byte[], byte[]>> input = p.apply(SyntheticBoundedInput.readFrom(sso));
+ * }</pre>
+ */
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public class SyntheticBoundedIO {
+  /** Read from the synthetic source options. */
+  public static Read.Bounded<KV<byte[], byte[]>> readFrom(SyntheticSourceOptions options) {
+    checkNotNull(options, "Input synthetic source options should not be null.");
+    return Read.from(new SyntheticBoundedSource(options));
+  }
+
+  /** A {@link SyntheticBoundedSource} that reads {@code KV<byte[], byte[]>}. */
+  public static class SyntheticBoundedSource extends OffsetBasedSource<KV<byte[], byte[]>> {
+    private static final long serialVersionUID = 0;
+    private static final Logger LOG = LoggerFactory.getLogger(SyntheticBoundedSource.class);
+
+    private final SyntheticSourceOptions sourceOptions;
+
+    public SyntheticBoundedSource(SyntheticSourceOptions sourceOptions) {
+      this(0, sourceOptions.numRecords, sourceOptions);
+    }
+
+    SyntheticBoundedSource(long startOffset, long endOffset, SyntheticSourceOptions sourceOptions) {
+      super(startOffset, endOffset, 1);
+      this.sourceOptions = sourceOptions;
+      LOG.debug("Constructing {}", toString());
+    }
+
+    @Override
+    public Coder<KV<byte[], byte[]>> getDefaultOutputCoder() {
+      return KvCoder.of(ByteArrayCoder.of(), ByteArrayCoder.of());
+    }
+
+    @Override
+    // TODO: test cases where the source size could not be estimated (i.e., return 0).
+    // TODO: test cases where the key size and value size might differ from record to record.
+    // The key size and value size might have their own distributions.
+    public long getBytesPerOffset() {
+      return sourceOptions.bytesPerRecord >= 0
+          ? sourceOptions.bytesPerRecord
+          : sourceOptions.keySizeBytes + sourceOptions.valueSizeBytes;
+    }
+
+    @Override
+    public void validate() {
+      super.validate();
+      sourceOptions.validate();
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(this)
+          .add("options", sourceOptions)
+          .add("offsetRange", "[" + getStartOffset() + ", " + getEndOffset() + ")")
+          .toString();
+    }
+
+    @Override
+    public final SyntheticBoundedSource createSourceForSubrange(long start, long end) {
+      checkArgument(
+          start >= getStartOffset(),
+          "Start offset value "
+              + start
+              + " of the subrange cannot be smaller than the start offset value "
+              + getStartOffset()
+              + " of the parent source");
+      checkArgument(
+          end <= getEndOffset(),
+          "End offset value "
+              + end
+              + " of the subrange cannot be larger than the end offset value "
+              + getEndOffset()
+              + " of the parent source");
+
+      return new SyntheticBoundedSource(start, end, sourceOptions);
+    }
+
+    @Override
+    public long getMaxEndOffset(PipelineOptions options) {
+      return getEndOffset();
+    }
+
+    @Override
+    public SyntheticSourceReader createReader(PipelineOptions pipelineOptions) {
+      return new SyntheticSourceReader(this);
+    }
+
+    @Override
+    public List<SyntheticBoundedSource> split(long desiredBundleSizeBytes, PipelineOptions options)
+        throws Exception {
+      // Choose number of bundles either based on explicit parameter,
+      // or based on size and hints.
+      int desiredNumBundles =
+          (sourceOptions.forceNumInitialBundles == null)
+              ? ((int) Math.ceil(1.0 * getEstimatedSizeBytes(options) / desiredBundleSizeBytes))
+              : sourceOptions.forceNumInitialBundles;
+
+      List<SyntheticBoundedSource> res =
+          generateBundleSizes(desiredNumBundles)
+              .stream()
+              .map(
+                  offsetRange ->
+                      createSourceForSubrange(offsetRange.getFrom(), offsetRange.getTo()))
+              .collect(Collectors.toList());
+      LOG.info("Split into {} bundles of sizes: {}", res.size(), res);
+      return res;
+    }
+
+    private List<OffsetRange> generateBundleSizes(int desiredNumBundles) {
+      List<OffsetRange> result = new ArrayList<>();
+
+      // Generate relative bundle sizes using the given distribution.
+      double[] relativeSizes = new double[desiredNumBundles];
+      for (int i = 0; i < relativeSizes.length; ++i) {
+        relativeSizes[i] =
+            sourceOptions.bundleSizeDistribution.sample(
+                sourceOptions.hashFunction.hashInt(i).asLong());
+      }
+
+      // Generate offset ranges proportional to the relative sizes.
+      double s = sum(relativeSizes);
+      long startOffset = getStartOffset();
+      double sizeSoFar = 0;
+      for (int i = 0; i < relativeSizes.length; ++i) {
+        sizeSoFar += relativeSizes[i];
+        long endOffset =
+            (i == relativeSizes.length - 1)
+                ? getEndOffset()
+                : (long) (getStartOffset() + sizeSoFar * (getEndOffset() - getStartOffset()) / s);
+        if (startOffset != endOffset) {
+          result.add(new OffsetRange(startOffset, endOffset));
+        }
+        startOffset = endOffset;
+      }
+      return result;
+    }
+  }
+
+  /**
+   * Shape of the progress reporting curve as a function of the current offset in the {@link
+   * SyntheticBoundedSource}.
+   */
+  public enum ProgressShape {
+    /** Reported progress grows linearly from 0 to 1. */
+    LINEAR,
+    /** Reported progress decreases linearly from 0.9 to 0.1. */
+    LINEAR_REGRESSING,
+  }
+
+  /**
+   * Synthetic bounded source options. These options are all JSON, see documentations of individual
+   * fields for details. {@code SyntheticSourceOptions} uses jackson annotations which
+   * PipelineOptionsFactory can use to parse and construct an instance.
+   */
+  public static class SyntheticSourceOptions extends SyntheticOptions {
+    private static final long serialVersionUID = 0;
+
+    /** Total number of generated records. */
+    @JsonProperty public long numRecords;
+
+    /**
+     * Only records whose index is a multiple of this will be split points. 0 means the source is
+     * not dynamically splittable (but is perfectly statically splittable). In that case it also
+     * doesn't report progress at all.
+     */
+    @JsonProperty public long splitPointFrequencyRecords = 1;
+
+    /**
+     * Distribution for generating initial split bundles.
+     *
+     * <p>When splitting into "desiredBundleSizeBytes", we'll compute the desired number of bundles
+     * N, then sample this many numbers from this distribution, normalize their sum to 1, and use
+     * that as the boundaries of generated bundles.
+     *
+     * <p>The Zipf distribution is expected to be particularly useful here.
+     *
+     * <p>E.g., empirically, with 100 bundles, the Zipf distribution with a parameter of 3.5 will
+     * generate bundles where the largest is about 3x-10x larger than the median; with a parameter
+     * of 3.0 this ratio will be about 5x-50x; with 2.5, 5x-100x (i.e. 1 bundle can be as large as
+     * all others combined).
+     */
+    @JsonDeserialize(using = SamplerDeserializer.class)
+    public Sampler bundleSizeDistribution = fromRealDistribution(new ConstantRealDistribution(1));
+
+    /**
+     * If specified, this source will split into exactly this many bundles regardless of the hints
+     * provided by the service.
+     */
+    @JsonProperty public Integer forceNumInitialBundles;
+
+    /** See {@link ProgressShape}. */
+    @JsonProperty public ProgressShape progressShape = ProgressShape.LINEAR;
+
+    /**
+     * The distribution for the delay when reading from synthetic source starts. This delay is
+     * independent of the per-record delay and uses the same types of distributions as {@link
+     * #delayDistribution}.
+     */
+    @JsonDeserialize(using = SamplerDeserializer.class)
+    final Sampler initializeDelayDistribution =
+        fromRealDistribution(new ConstantRealDistribution(0));
+
+    /**
+     * Generates a random delay value for the synthetic source initialization using the distribution
+     * defined by {@link #initializeDelayDistribution}.
+     */
+    Duration nextInitializeDelay(long seed) {
+      return Duration.millis((long) initializeDelayDistribution.sample(seed));
+    }
+
+    @Override
+    public void validate() {
+      super.validate();
+      checkArgument(
+          numRecords >= 0, "numRecords should be a non-negative number, but found %s.", numRecords);
+      checkNotNull(bundleSizeDistribution, "bundleSizeDistribution");
+      checkArgument(
+          forceNumInitialBundles == null || forceNumInitialBundles > 0,
+          "forceNumInitialBundles, if specified, must be positive, but found %s",
+          forceNumInitialBundles);
+      checkArgument(
+          splitPointFrequencyRecords >= 0,
+          "splitPointFrequencyRecords must be non-negative, but found %s",
+          splitPointFrequencyRecords);
+    }
+
+    public Record genRecord(long position) {
+      // This method is supposed to generate random records deterministically,
+      // so that results can be reproduced by running the same scenario a second time.
+      // We need to initiate a Random object for each position to make the record deterministic
+      // because liquid sharding could split the Source at any position.
+      // And we also need a seed to initiate a Random object. The mapping from the position to
+      // the seed should be fixed. Using the position as seed to feed Random objects will cause the
+      // generated values to not be random enough because the position values are
+      // close to each other. To make seeds fed into the Random objects unrelated,
+      // we use a hashing function to map the position to its corresponding hashcode,
+      // and use the hashcode as a seed to feed into the Random object.
+      long hashCodeOfPosition = hashFunction.hashLong(position).asLong();
+      return new Record(genKvPair(hashCodeOfPosition), nextDelay(hashCodeOfPosition));
+    }
+
+    /** Record generated by {@link #genRecord}. */
+    public static class Record {
+      public final KV<byte[], byte[]> kv;
+      public final Duration sleepMsec;
+
+      Record(KV<byte[], byte[]> kv, long sleepMsec) {
+        this.kv = kv;
+        this.sleepMsec = new Duration(sleepMsec);
+      }
+    }
+  }
+
+  /**
+   * A reader over the {@link PCollection} of {@code KV<byte[], byte[]>} from the synthetic source.
+   *
+   * <p>The random but deterministic record at position "i" in the range [A, B) is generated by
+   * using {@link SyntheticSourceOptions#genRecord}. Reading each record sleeps according to the
+   * sleep time distribution in {@code SyntheticOptions}.
+   */
+  private static class SyntheticSourceReader
+      extends OffsetBasedSource.OffsetBasedReader<KV<byte[], byte[]>> {
+    private final long splitPointFrequencyRecords;
+
+    private KV<byte[], byte[]> currentKvPair;
+    private long currentOffset;
+    private boolean isAtSplitPoint;
+
+    SyntheticSourceReader(SyntheticBoundedSource source) {
+      super(source);
+      this.currentKvPair = null;
+      this.splitPointFrequencyRecords = source.sourceOptions.splitPointFrequencyRecords;
+    }
+
+    @Override
+    public synchronized SyntheticBoundedSource getCurrentSource() {
+      return (SyntheticBoundedSource) super.getCurrentSource();
+    }
+
+    @Override
+    protected long getCurrentOffset() throws IllegalStateException {
+      return currentOffset;
+    }
+
+    @Override
+    public KV<byte[], byte[]> getCurrent() throws NoSuchElementException {
+      if (currentKvPair == null) {
+        throw new NoSuchElementException(
+            "The current element is unavailable because either the reader is "
+                + "at the beginning of the input and start() or advance() wasn't called, "
+                + "or the last start() or advance() returned false.");
+      }
+      return currentKvPair;
+    }
+
+    @Override
+    public boolean allowsDynamicSplitting() {
+      return splitPointFrequencyRecords > 0;
+    }
+
+    @Override
+    protected final boolean startImpl() throws IOException {
+      this.currentOffset = getCurrentSource().getStartOffset();
+      if (splitPointFrequencyRecords > 0) {
+        while (currentOffset % splitPointFrequencyRecords != 0) {
+          ++currentOffset;
+        }
+      }
+
+      SyntheticSourceOptions options = getCurrentSource().sourceOptions;
+      SyntheticUtils.delay(
+          options.nextInitializeDelay(this.currentOffset),
+          options.cpuUtilizationInMixedDelay,
+          options.delayType,
+          new Random(this.currentOffset));
+
+      isAtSplitPoint = true;
+      --currentOffset;
+      return advanceImpl();
+    }
+
+    @Override
+    protected boolean advanceImpl() {
+      currentOffset++;
+      isAtSplitPoint =
+          (splitPointFrequencyRecords == 0) || (currentOffset % splitPointFrequencyRecords == 0);
+
+      SyntheticSourceOptions options = getCurrentSource().sourceOptions;
+      SyntheticSourceOptions.Record record = options.genRecord(currentOffset);
+      currentKvPair = record.kv;
+      // TODO: add a separate distribution for the sleep time of reading the first record
+      // (e.g.,"open" the files).
+      long hashCodeOfVal = options.hashFunction.hashBytes(currentKvPair.getValue()).asLong();
+      Random random = new Random(hashCodeOfVal);
+      SyntheticUtils.delay(
+          record.sleepMsec, options.cpuUtilizationInMixedDelay, options.delayType, random);
+
+      return true;
+    }
+
+    @Override
+    public void close() {
+      // Nothing
+    }
+
+    @Override
+    public Double getFractionConsumed() {
+      double realFractionConsumed = super.getFractionConsumed();
+      ProgressShape shape = getCurrentSource().sourceOptions.progressShape;
+      switch (shape) {
+        case LINEAR:
+          return realFractionConsumed;
+        case LINEAR_REGRESSING:
+          return 0.9 - 0.8 * realFractionConsumed;
+        default:
+          throw new AssertionError("Unexpected progress shape: " + shape);
+      }
+    }
+
+    @Override
+    protected boolean isAtSplitPoint() throws NoSuchElementException {
+      return isAtSplitPoint;
+    }
+  }
+}
diff --git a/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticOptions.java b/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticOptions.java
new file mode 100644
index 0000000..b74fc06
--- /dev/null
+++ b/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticOptions.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.synthetic;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.Random;
+import org.apache.beam.sdk.values.KV;
+import org.apache.commons.math3.distribution.ConstantRealDistribution;
+import org.apache.commons.math3.distribution.ExponentialDistribution;
+import org.apache.commons.math3.distribution.IntegerDistribution;
+import org.apache.commons.math3.distribution.NormalDistribution;
+import org.apache.commons.math3.distribution.RealDistribution;
+import org.apache.commons.math3.distribution.UniformRealDistribution;
+import org.apache.commons.math3.distribution.ZipfDistribution;
+
+/**
+ * This {@link SyntheticOptions} class provides common parameterizable synthetic options that are
+ * used by {@link SyntheticBoundedIO}.
+ */
+public class SyntheticOptions implements Serializable {
+  private static final long serialVersionUID = 0;
+
+  /**
+   * The type of Delay that will be produced.
+   *
+   * <p>CPU delay produces a CPU-busy delay. SLEEP delay makes the process sleep.
+   */
+  public enum DelayType {
+    SLEEP,
+    CPU,
+    MIXED,
+  }
+
+  /** Mapper for (de)serializing JSON. */
+  private static final ObjectMapper MAPPER = new ObjectMapper();
+
+  /**
+   * Wrapper over a distribution. Unfortunately commons-math does not provide a common interface
+   * over both RealDistribution and IntegerDistribution, and we sometimes need one and sometimes the
+   * other.
+   */
+  public interface Sampler extends Serializable {
+    double sample(long seed);
+
+    // Make this class a bean, so Jackson can serialize it during SyntheticOptions.toString().
+    Object getDistribution();
+  }
+
+  public static Sampler fromRealDistribution(final RealDistribution dist) {
+    return new Sampler() {
+      private static final long serialVersionUID = 0L;
+
+      @Override
+      public double sample(long seed) {
+        dist.reseedRandomGenerator(seed);
+        return dist.sample();
+      }
+
+      @Override
+      public Object getDistribution() {
+        return dist;
+      }
+    };
+  }
+
+  public static Sampler fromIntegerDistribution(final IntegerDistribution dist) {
+    return new Sampler() {
+      private static final long serialVersionUID = 0L;
+
+      @Override
+      public double sample(long seed) {
+        dist.reseedRandomGenerator(seed);
+        return dist.sample();
+      }
+
+      @Override
+      public Object getDistribution() {
+        return dist;
+      }
+    };
+  }
+
+  private static Sampler scaledSampler(final Sampler sampler, final double multiplier) {
+    return new Sampler() {
+      private static final long serialVersionUID = 0L;
+
+      @Override
+      public double sample(long seed) {
+        return multiplier * sampler.sample(seed);
+      }
+
+      @Override
+      public Object getDistribution() {
+        return sampler.getDistribution();
+      }
+    };
+  }
+
+  /** The key size in bytes. */
+  @JsonProperty public long keySizeBytes = 1;
+
+  /** The value size in bytes. */
+  @JsonProperty public long valueSizeBytes = 1;
+
+  /**
+   * The size of a single record used for size estimation in bytes. If less than zero, keySizeBytes
+   * + valueSizeBytes is used.
+   */
+  @JsonProperty public final long bytesPerRecord;
+
+  /** The number of distinct "hot" keys. */
+  @JsonProperty public long numHotKeys;
+
+  /**
+   * The fraction of records associated with "hot" keys, which are uniformly distributed over a
+   * fixed number of hot keys.
+   */
+  @JsonProperty public double hotKeyFraction;
+
+  /** The fraction of keys that should be larger than others. */
+  @JsonProperty public double largeKeyFraction = 0.0;
+
+  /** The size of large keys. */
+  @JsonProperty public double largeKeySizeBytes = 1000;
+
+  /** The seed is used for generating a hash function implementing the 128-bit murmur3 algorithm. */
+  @JsonIgnore public int seed;
+
+  /**
+   * The hash function is used to generate seeds that are fed into the random number generators and
+   * the sleep time distributions.
+   */
+  @JsonIgnore public transient HashFunction hashFunction;
+
+  /**
+   * SyntheticOptions supports several delay distributions including uniform, normal, exponential,
+   * and constant delay per record. The delay is either sleep or CPU spinning for the duration.
+   *
+   * <ul>
+   *   <li>The uniform delay distribution is specified through
+   *       "delayDistribution":{"type":"uniform","lower":lower_bound,"upper":upper_bound}, where
+   *       lower_bound and upper_bound are non-negative numbers representing the delay range in
+   *       milliseconds.
+   *   <li>The normal delay distribution is specified through
+   *       "delayDistribution":{"type":"normal","mean":mean,"stddev":stddev}, where mean is a
+   *       non-negative number representing the mean of this normal distributed delay in
+   *       milliseconds and stddev is a positive number representing its standard deviation.
+   *   <li>The exponential delay distribution is specified through
+   *       "delayDistribution":{"type":"exp","mean":mean}, where mean is a positive number
+   *       representing the mean of this exponentially distributed delay in milliseconds.
+   *   <li>The zipf distribution is specified through
+   *       "delayDistribution":{"type":"zipf","param":param,"multiplier":multiplier}, where param is
+   *       a number > 1 and multiplier just scales the output of the distribution. By default, the
+   *       multiplier is 1. Parameters closer to 1 produce dramatically more skewed results. E.g.
+   *       given 100 samples, the min will almost always be 1, while max with param 3 will usually
+   *       be below 10; with param 2 max will usually be between several dozen and several hundred;
+   *       with param 1.5, thousands to millions.
+   *   <li>The constant sleep time per record is specified through
+   *       "delayDistribution":{"type":"const","const":const} where const is a non-negative number
+   *       representing the constant sleep time in milliseconds.
+   * </ul>
+   *
+   * <p>The field delayDistribution is not used in the synthetic unbounded source. The synthetic
+   * unbounded source uses RateLimiter to control QPS.
+   */
+  @JsonDeserialize(using = SamplerDeserializer.class)
+  private final Sampler delayDistribution = fromRealDistribution(new ConstantRealDistribution(0));
+
+  /**
+   * When 'delayDistribution' is configured, this indicates how the delay enforced ("SLEEP", "CPU",
+   * or "MIXED").
+   */
+  @JsonProperty public final DelayType delayType = DelayType.SLEEP;
+
+  /**
+   * CPU utilization when delayType is 'MIXED'. This determines the fraction of processing time
+   * spent spinning. The remaining time is spent sleeping. For each millisecond of processing time
+   * we choose to spin with probability equal to this fraction.
+   */
+  @JsonProperty public final double cpuUtilizationInMixedDelay;
+
+  SyntheticOptions() {
+    cpuUtilizationInMixedDelay = 0.1;
+    bytesPerRecord = -1;
+  }
+
+  @JsonDeserialize
+  public void setSeed(int seed) {
+    this.seed = seed;
+    this.hashFunction = Hashing.murmur3_128(seed);
+  }
+
+  static class SamplerDeserializer extends JsonDeserializer<Sampler> {
+    @Override
+    public Sampler deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException {
+      JsonNode node = jp.getCodec().readTree(jp);
+      String type = node.get("type").asText();
+      switch (type) {
+        case "uniform":
+          {
+            double lowerBound = node.get("lower").asDouble();
+            double upperBound = node.get("upper").asDouble();
+            checkArgument(
+                lowerBound >= 0,
+                "The lower bound of uniform distribution should be a non-negative number, "
+                    + "but found %s.",
+                lowerBound);
+            return fromRealDistribution(new UniformRealDistribution(lowerBound, upperBound));
+          }
+        case "exp":
+          {
+            double mean = node.get("mean").asDouble();
+            return fromRealDistribution(new ExponentialDistribution(mean));
+          }
+        case "normal":
+          {
+            double mean = node.get("mean").asDouble();
+            double stddev = node.get("stddev").asDouble();
+            checkArgument(
+                mean >= 0,
+                "The mean of normal distribution should be a non-negative number, but found %s.",
+                mean);
+            return fromRealDistribution(new NormalDistribution(mean, stddev));
+          }
+        case "const":
+          {
+            double constant = node.get("const").asDouble();
+            checkArgument(
+                constant >= 0,
+                "The value of constant distribution should be a non-negative number, but found %s.",
+                constant);
+            return fromRealDistribution(new ConstantRealDistribution(constant));
+          }
+        case "zipf":
+          {
+            double param = node.get("param").asDouble();
+            final double multiplier =
+                node.has("multiplier") ? node.get("multiplier").asDouble() : 1.0;
+            checkArgument(
+                param > 1,
+                "The parameter of the Zipf distribution should be > 1, but found %s.",
+                param);
+            checkArgument(
+                multiplier >= 0,
+                "The multiplier of the Zipf distribution should be >= 0, but found %s.",
+                multiplier);
+            final ZipfDistribution dist = new ZipfDistribution(100, param);
+            return scaledSampler(fromIntegerDistribution(dist), multiplier);
+          }
+        default:
+          {
+            throw new IllegalArgumentException("Unknown distribution type: " + type);
+          }
+      }
+    }
+  }
+
+  public void validate() {
+    checkArgument(
+        keySizeBytes > 0, "keySizeBytes should be a positive number, but found %s", keySizeBytes);
+    checkArgument(
+        valueSizeBytes >= 0,
+        "valueSizeBytes should be a non-negative number, but found %s",
+        valueSizeBytes);
+    checkArgument(
+        numHotKeys >= 0, "numHotKeys should be a non-negative number, but found %s", numHotKeys);
+    checkArgument(
+        hotKeyFraction >= 0,
+        "hotKeyFraction should be a non-negative number, but found %s",
+        hotKeyFraction);
+    checkArgument(hashFunction != null, "hashFunction hasn't been initialized.");
+    if (hotKeyFraction > 0) {
+      int intBytes = Integer.SIZE / 8;
+      checkArgument(
+          keySizeBytes >= intBytes,
+          "Allowing hot keys (hotKeyFraction=%s) requires keySizeBytes "
+              + "to be at least %s, but found %s",
+          hotKeyFraction,
+          intBytes,
+          keySizeBytes);
+    }
+  }
+
+  @Override
+  public String toString() {
+    try {
+      return MAPPER.writeValueAsString(this);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public long nextDelay(long seed) {
+    return (long) delayDistribution.sample(seed);
+  }
+
+  public KV<byte[], byte[]> genKvPair(long seed) {
+    Random random = new Random(seed);
+
+    byte[] key = new byte[(int) keySizeBytes];
+    // Set the user-key to contain characters other than ordered-code escape characters
+    // (specifically '\0' or '\xff'). The user-key is encoded into the shuffle-key using
+    // ordered-code, and the shuffle-key is then checked for size limit violations. A user-key
+    // consisting of '\0' keySizeBytes would produce a shuffle-key encoding double in size,
+    // which would go over the shuffle-key limit (see b/28770924).
+    for (int i = 0; i < keySizeBytes; ++i) {
+      key[i] = 42;
+    }
+    // Determines whether to generate hot key or not.
+    if (random.nextDouble() < hotKeyFraction) {
+      // Generate hot key.
+      // An integer is randomly selected from the range [0, numHotKeys-1] with equal probability.
+      int randInt = random.nextInt((int) numHotKeys);
+      ByteBuffer.wrap(key).putInt(hashFunction.hashInt(randInt).asInt());
+    } else {
+      // Note that the random generated key might be a hot key.
+      // But the probability of being a hot key is very small.
+      random.nextBytes(key);
+    }
+
+    byte[] val = new byte[(int) valueSizeBytes];
+    random.nextBytes(val);
+    return KV.of(key, val);
+  }
+}
diff --git a/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticUtils.java b/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticUtils.java
new file mode 100644
index 0000000..507b962
--- /dev/null
+++ b/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticUtils.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.synthetic;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.hash.Hashing;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import org.joda.time.Duration;
+
+/** Utility functions used in {@link org.apache.beam.sdk.io.synthetic}. */
+class SyntheticUtils {
+  // cpu delay implementation:
+
+  private static final long MASK = (1L << 16) - 1L;
+  private static final long HASH = 0x243F6A8885A308D3L;
+  private static final long INIT_PLAINTEXT = 50000L;
+
+  /** Keep cpu busy for {@code delayMillis} by calculating lots of hashes. */
+  private static void cpuDelay(long delayMillis) {
+    // Note that the delay is enforced in terms of walltime. That implies this thread may not
+    // keep CPU busy if it gets preempted by other threads. There is more of chance of this
+    // occurring in a streaming pipeline as there could be lots of threads running this. The loop
+    // measures cpu time spent for each iteration, so that these effects are some what minimized.
+
+    long cpuMicros = delayMillis * 1000;
+    Stopwatch timer = Stopwatch.createUnstarted();
+
+    while (timer.elapsed(TimeUnit.MICROSECONDS) < cpuMicros) {
+      // Find a long which hashes to HASH in lowest MASK bits.
+      // Values chosen to roughly take 1ms on typical workstation.
+      timer.start();
+      long p = INIT_PLAINTEXT;
+      while (true) {
+        long t = Hashing.murmur3_128().hashLong(p).asLong();
+        if ((t & MASK) == (HASH & MASK)) {
+          break;
+        }
+        p++;
+      }
+      timer.stop();
+    }
+  }
+
+  /**
+   * Implements a mechanism to delay a thread in various fashions. * {@code CPU}: Burn CPU while
+   * waiting. * {@code SLEEP}: Sleep uninterruptibly while waiting. * {@code MIXED}: Switch between
+   * burning CPU and sleeping every millisecond to emulate a desired CPU utilization specified by
+   * {@code cpuUtilizationInMixedDelay}.
+   *
+   * @return Millis spent sleeping, does not include time spent spinning.
+   */
+  static long delay(
+      Duration delay,
+      double cpuUtilizationInMixedDelay,
+      SyntheticOptions.DelayType delayType,
+      Random rnd) {
+    switch (delayType) {
+      case CPU:
+        cpuDelay(delay.getMillis());
+        return 0;
+      case SLEEP:
+        Uninterruptibles.sleepUninterruptibly(
+            Math.max(0L, delay.getMillis()), TimeUnit.MILLISECONDS);
+        return delay.getMillis();
+      case MIXED:
+        // Mixed mode: for each millisecond of delay randomly choose to spin or sleep.
+        // This is enforced at millisecond granularity since that is the minimum duration that
+        // Thread.sleep() can sleep. Millisecond is also the unit of processing delay.
+        long sleepMillis = 0;
+        for (long i = 0; i < delay.getMillis(); i++) {
+          if (rnd.nextDouble() < cpuUtilizationInMixedDelay) {
+            delay(new Duration(1), 0.0, SyntheticOptions.DelayType.CPU, rnd);
+          } else {
+            sleepMillis += delay(new Duration(1), 0.0, SyntheticOptions.DelayType.SLEEP, rnd);
+          }
+        }
+        return sleepMillis;
+      default:
+        throw new IllegalArgumentException("Unknown delay type " + delayType);
+    }
+  }
+}
diff --git a/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/package-info.java b/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/package-info.java
new file mode 100644
index 0000000..d2f9d71
--- /dev/null
+++ b/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/** Transforms for performing Synthetic Operations in Apache Beam pipelines. */
+package org.apache.beam.sdk.io.synthetic;
diff --git a/sdks/java/io/synthetic/src/test/java/org/apache/beam/sdk/io/synthetic/SyntheticBoundedIOTest.java b/sdks/java/io/synthetic/src/test/java/org/apache/beam/sdk/io/synthetic/SyntheticBoundedIOTest.java
new file mode 100644
index 0000000..ca648fa
--- /dev/null
+++ b/sdks/java/io/synthetic/src/test/java/org/apache/beam/sdk/io/synthetic/SyntheticBoundedIOTest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.synthetic;
+
+import static org.apache.beam.sdk.io.synthetic.SyntheticOptions.fromIntegerDistribution;
+import static org.apache.beam.sdk.io.synthetic.SyntheticOptions.fromRealDistribution;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.List;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.synthetic.SyntheticBoundedIO.SyntheticBoundedSource;
+import org.apache.beam.sdk.io.synthetic.SyntheticBoundedIO.SyntheticSourceOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.SourceTestUtils;
+import org.apache.beam.sdk.values.KV;
+import org.apache.commons.math3.distribution.ConstantRealDistribution;
+import org.apache.commons.math3.distribution.ZipfDistribution;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link SyntheticBoundedIO}. */
+@RunWith(JUnit4.class)
+public class SyntheticBoundedIOTest {
+  @Rule public final ExpectedException thrown = ExpectedException.none();
+
+  private SyntheticSourceOptions testSourceOptions = new SyntheticSourceOptions();
+
+  @Before
+  public void setUp() {
+    testSourceOptions.splitPointFrequencyRecords = 1;
+    testSourceOptions.numRecords = 10;
+    testSourceOptions.keySizeBytes = 10;
+    testSourceOptions.valueSizeBytes = 20;
+    testSourceOptions.numHotKeys = 3;
+    testSourceOptions.hotKeyFraction = 0.3;
+    testSourceOptions.setSeed(123456);
+    testSourceOptions.bundleSizeDistribution =
+        fromIntegerDistribution(new ZipfDistribution(100, 2.5));
+    testSourceOptions.forceNumInitialBundles = null;
+  }
+
+  private SyntheticSourceOptions fromString(String jsonString) throws IOException {
+    ObjectMapper mapper = new ObjectMapper();
+    SyntheticSourceOptions result = mapper.readValue(jsonString, SyntheticSourceOptions.class);
+    result.validate();
+    return result;
+  }
+
+  @Test
+  public void testInvalidSourceOptionsJsonFormat() throws Exception {
+    thrown.expect(JsonParseException.class);
+    String syntheticSourceOptions = "input:unknown URI";
+    fromString(syntheticSourceOptions);
+  }
+
+  @Test
+  public void testFromString() throws Exception {
+    String syntheticSourceOptions =
+        "{\"numRecords\":100,\"splitPointFrequencyRecords\":10,\"keySizeBytes\":10,"
+            + "\"valueSizeBytes\":20,\"numHotKeys\":3,"
+            + "\"hotKeyFraction\":0.3,\"seed\":123456,"
+            + "\"bundleSizeDistribution\":{\"type\":\"const\",\"const\":42},"
+            + "\"forceNumInitialBundles\":10,\"progressShape\":\"LINEAR_REGRESSING\""
+            + "}";
+    SyntheticSourceOptions sourceOptions = fromString(syntheticSourceOptions);
+    assertEquals(100, sourceOptions.numRecords);
+    assertEquals(10, sourceOptions.splitPointFrequencyRecords);
+    assertEquals(10, sourceOptions.keySizeBytes);
+    assertEquals(20, sourceOptions.valueSizeBytes);
+    assertEquals(3, sourceOptions.numHotKeys);
+    assertEquals(0.3, sourceOptions.hotKeyFraction, 0);
+    assertEquals(0, sourceOptions.nextDelay(sourceOptions.seed));
+    assertEquals(123456, sourceOptions.seed);
+    assertEquals(42, sourceOptions.bundleSizeDistribution.sample(123), 0.0);
+    assertEquals(10, sourceOptions.forceNumInitialBundles.intValue());
+    assertEquals(SyntheticBoundedIO.ProgressShape.LINEAR_REGRESSING, sourceOptions.progressShape);
+  }
+
+  @Test
+  public void testSourceOptionsWithNegativeNumRecords() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("numRecords should be a non-negative number, but found -100");
+    testSourceOptions.numRecords = -100;
+    testSourceOptions.validate();
+  }
+
+  /** Test the reader and the source produces the same records. */
+  @Test
+  public void testSourceAndReadersWork() throws Exception {
+    testSourceAndReadersWorkP(1);
+    testSourceAndReadersWorkP(-1);
+    testSourceAndReadersWorkP(3);
+  }
+
+  private void testSourceAndReadersWorkP(long splitPointFrequency) throws Exception {
+    PipelineOptions options = PipelineOptionsFactory.create();
+    testSourceOptions.splitPointFrequencyRecords = splitPointFrequency;
+    SyntheticBoundedSource source = new SyntheticBoundedSource(testSourceOptions);
+    assertEquals(10 * (10 + 20), source.getEstimatedSizeBytes(options));
+    SourceTestUtils.assertUnstartedReaderReadsSameAsItsSource(
+        source.createReader(options), options);
+  }
+
+  @Test
+  public void testSplitAtFraction() throws Exception {
+    testSplitAtFractionP(1);
+    testSplitAtFractionP(3);
+    // Do not test "-1" because then splits would be vacuous
+  }
+
+  private void testSplitAtFractionP(long splitPointFrequency) throws Exception {
+    PipelineOptions options = PipelineOptionsFactory.create();
+    testSourceOptions.splitPointFrequencyRecords = splitPointFrequency;
+    SyntheticBoundedSource source = new SyntheticBoundedSource(testSourceOptions);
+    SourceTestUtils.assertSplitAtFractionExhaustive(source, options);
+    // Can't split if already consumed.
+    SourceTestUtils.assertSplitAtFractionFails(source, 5, 0.3, options);
+    SourceTestUtils.assertSplitAtFractionSucceedsAndConsistent(source, 1, 0.3, options);
+  }
+
+  @Test
+  public void testSplitIntoBundles() throws Exception {
+    testSplitIntoBundlesP(1);
+    testSplitIntoBundlesP(-1);
+    testSplitIntoBundlesP(5);
+
+    PipelineOptions options = PipelineOptionsFactory.create();
+    testSourceOptions.forceNumInitialBundles = 37;
+    assertEquals(
+        37,
+        new SyntheticBoundedIO.SyntheticBoundedSource(testSourceOptions).split(42, options).size());
+  }
+
+  private void testSplitIntoBundlesP(long splitPointFrequency) throws Exception {
+    PipelineOptions options = PipelineOptionsFactory.create();
+    testSourceOptions.splitPointFrequencyRecords = splitPointFrequency;
+    testSourceOptions.numRecords = 100;
+    SyntheticBoundedSource source = new SyntheticBoundedSource(testSourceOptions);
+    SourceTestUtils.assertSourcesEqualReferenceSource(source, source.split(10, options), options);
+    SourceTestUtils.assertSourcesEqualReferenceSource(source, source.split(40, options), options);
+    SourceTestUtils.assertSourcesEqualReferenceSource(source, source.split(100, options), options);
+  }
+
+  @Test
+  public void testIncreasingProgress() throws Exception {
+    PipelineOptions options = PipelineOptionsFactory.create();
+    testSourceOptions.progressShape = SyntheticBoundedIO.ProgressShape.LINEAR;
+    SyntheticBoundedSource source = new SyntheticBoundedSource(testSourceOptions);
+    BoundedSource.BoundedReader<KV<byte[], byte[]>> reader = source.createReader(options);
+    // Reader starts at 0.0 progress.
+    assertEquals(0, reader.getFractionConsumed(), 1e-5);
+    // Set the lastFractionConsumed < 0.0 so that we can use strict inequality in the below loop.
+    double lastFractionConsumed = -1.0;
+    for (boolean more = reader.start(); more; more = reader.advance()) {
+      assertTrue(reader.getFractionConsumed() > lastFractionConsumed);
+      lastFractionConsumed = reader.getFractionConsumed();
+    }
+    assertEquals(1, reader.getFractionConsumed(), 1e-5);
+  }
+
+  @Test
+  public void testRegressingProgress() throws Exception {
+    PipelineOptions options = PipelineOptionsFactory.create();
+    testSourceOptions.progressShape = SyntheticBoundedIO.ProgressShape.LINEAR_REGRESSING;
+    SyntheticBoundedSource source = new SyntheticBoundedSource(testSourceOptions);
+    BoundedSource.BoundedReader<KV<byte[], byte[]>> reader = source.createReader(options);
+    double lastFractionConsumed = reader.getFractionConsumed();
+    for (boolean more = reader.start(); more; more = reader.advance()) {
+      assertTrue(reader.getFractionConsumed() <= lastFractionConsumed);
+      lastFractionConsumed = reader.getFractionConsumed();
+    }
+  }
+
+  @Test
+  public void testSplitIntoSingleRecordBundles() throws Exception {
+    PipelineOptions options = PipelineOptionsFactory.create();
+    SyntheticSourceOptions sourceOptions = new SyntheticSourceOptions();
+    sourceOptions.numRecords = 10;
+    sourceOptions.setSeed(123456);
+    sourceOptions.bundleSizeDistribution = fromRealDistribution(new ConstantRealDistribution(1.0));
+    sourceOptions.forceNumInitialBundles = 10;
+    SyntheticBoundedSource source = new SyntheticBoundedSource(sourceOptions);
+    List<SyntheticBoundedSource> sources = source.split(42L, options);
+    for (SyntheticBoundedSource recordSource : sources) {
+      recordSource.validate();
+      assertEquals(1, recordSource.getEndOffset() - recordSource.getStartOffset());
+    }
+    SourceTestUtils.assertSourcesEqualReferenceSource(source, sources, options);
+  }
+}
diff --git a/sdks/python/apache_beam/testing/synthetic_pipeline.py b/sdks/python/apache_beam/testing/synthetic_pipeline.py
new file mode 100644
index 0000000..c76b9cd
--- /dev/null
+++ b/sdks/python/apache_beam/testing/synthetic_pipeline.py
@@ -0,0 +1,502 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""A set of utilities to write pipelines for performance tests.
+
+This module offers a way to create pipelines using synthetic sources and steps.
+Exact shape of the pipeline and the behaviour of sources and steps can be
+controlled through arguments. Please see function 'parse_args()' for more
+details about the arguments.
+
+Shape of the pipeline is primariy controlled through two arguments. Argument
+'steps' can be used to define a list of steps as a JSON string. Argument
+'barrier' describes how these steps are separated from each other. Argument
+'barrier' can be use to build a pipeline as a a series of steps or a tree of
+steps with a fanin or a fanout of size 2.
+
+Other arguments describe what gets generated by synthetic sources that produce
+data for the pipeline.
+"""
+
+from __future__ import absolute_import
+from __future__ import division
+
+import argparse
+import json
+import logging
+import math
+import time
+
+import apache_beam as beam
+from apache_beam.io import WriteToText
+from apache_beam.io import iobase
+from apache_beam.io import range_trackers
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.testing.test_pipeline import TestPipeline
+
+try:
+  import numpy as np
+except ImportError:
+  np = None
+
+
+def parse_byte_size(s):
+  suffixes = 'BKMGTP'
+  if s[-1] in suffixes:
+    return int(float(s[:-1]) * 1024**suffixes.index(s[-1]))
+
+  return int(s)
+
+
+def div_round_up(a, b):
+  """Return ceil(a/b)."""
+  return int(math.ceil(float(a) / b))
+
+
+def rotate_key(element):
+  """Returns a new key-value pair of the same size but with a different key."""
+  (key, value) = element
+  return key[-1] + key[:-1], value
+
+
+class SyntheticStep(beam.DoFn):
+  """A DoFn of which behavior can be controlled through prespecified parameters.
+  """
+
+  def __init__(self, per_element_delay_sec=0, per_bundle_delay_sec=0,
+               output_records_per_input_record=1, output_filter_ratio=0):
+    if per_element_delay_sec and per_element_delay_sec < 1e-3:
+      raise ValueError('Per element sleep time must be at least 1e-3. '
+                       'Received: %r', per_element_delay_sec)
+    self._per_element_delay_sec = per_element_delay_sec
+    self._per_bundle_delay_sec = per_bundle_delay_sec
+    self._output_records_per_input_record = output_records_per_input_record
+    self._output_filter_ratio = output_filter_ratio
+
+  def start_bundle(self):
+    self._start_time = time.time()
+
+  def finish_bundle(self):
+    # The target is for the enclosing stage to take as close to as possible
+    # the given number of seconds, so we only sleep enough to make up for
+    # overheads not incurred elsewhere.
+    to_sleep = self._per_bundle_delay_sec - (time.time() - self._start_time)
+
+    # Ignoring sub-millisecond sleep times.
+    if to_sleep >= 1e-3:
+      time.sleep(to_sleep)
+
+  def process(self, element):
+    if self._per_element_delay_sec >= 1e-3:
+      time.sleep(self._per_element_delay_sec)
+    filter_element = False
+    if self._output_filter_ratio > 0:
+      if np.random.random() < self._output_filter_ratio:
+        filter_element = True
+
+    if not filter_element:
+      for _ in range(self._output_records_per_input_record):
+        yield element
+
+
+class SyntheticSource(iobase.BoundedSource):
+  """A custom source of a specified size.
+  """
+
+  def __init__(self, input_spec):
+    """Initiates a synthetic source.
+
+    Args:
+      input_spec: Input specification of the source. See corresponding option in
+                  function 'parse_args()' below for more details.
+    Raises:
+      ValueError: if input parameters are invalid.
+    """
+
+    def maybe_parse_byte_size(s):
+      return parse_byte_size(s) if isinstance(s, str) else int(s)
+
+    self._num_records = input_spec['numRecords']
+    self._key_size = maybe_parse_byte_size(input_spec.get('keySizeBytes', 1))
+    self._value_size = maybe_parse_byte_size(
+        input_spec.get('valueSizeBytes', 1))
+    self._total_size = self.element_size * self._num_records
+    self._initial_splitting = (
+        input_spec['bundleSizeDistribution']['type']
+        if 'bundleSizeDistribution' in input_spec else 'const')
+    if self._initial_splitting != 'const' and self._initial_splitting != 'zipf':
+      raise ValueError(
+          'Only const and zipf distributions are supported for determining '
+          'sizes of bundles produced by initial splitting. Received: %s',
+          self._initial_splitting)
+    self._initial_splitting_num_bundles = (
+        input_spec['forceNumInitialBundles']
+        if 'forceNumInitialBundles' in input_spec else 0)
+    if self._initial_splitting == 'zipf':
+      self._initial_splitting_distribution_parameter = (
+          input_spec['bundleSizeDistribution']['param'])
+      if self._initial_splitting_distribution_parameter < 1:
+        raise ValueError(
+            'Parameter for a Zipf distribution must be larger than 1. '
+            'Received %r.', self._initial_splitting_distribution_parameter)
+    else:
+      self._initial_splitting_distribution_parameter = 0
+    self._dynamic_splitting = (
+        'none' if (
+            'splitPointFrequencyRecords' in input_spec
+            and input_spec['splitPointFrequencyRecords'] == 0)
+        else 'perfect')
+    if 'delayDistribution' in input_spec:
+      if input_spec['delayDistribution']['type'] != 'const':
+        raise ValueError('SyntheticSource currently only supports delay '
+                         'distributions of type \'const\'. Received %s.',
+                         input_spec['delayDistribution']['type'])
+      self._sleep_per_input_record_sec = (
+          float(input_spec['delayDistribution']['const']) / 1000)
+      if (self._sleep_per_input_record_sec and
+          self._sleep_per_input_record_sec < 1e-3):
+        raise ValueError('Sleep time per input record must be at least 1e-3.'
+                         ' Received: %r', self._sleep_per_input_record_sec)
+    else:
+      self._sleep_per_input_record_sec = 0
+
+  @property
+  def element_size(self):
+    return self._key_size + self._value_size
+
+  def estimate_size(self):
+    return self._total_size
+
+  def split(self, desired_bundle_size, start_position=0, stop_position=None):
+    # Performs initial splitting of SyntheticSource.
+    #
+    # Exact sizes and distribution of initial splits generated here depends on
+    # the input specification of the SyntheticSource.
+
+    if stop_position is None:
+      stop_position = self._num_records
+    if self._initial_splitting == 'zipf':
+      desired_num_bundles = self._initial_splitting_num_bundles or math.ceil(
+          float(self.estimate_size()) / desired_bundle_size)
+      samples = np.random.zipf(self._initial_splitting_distribution_parameter,
+                               desired_num_bundles)
+      total = sum(samples)
+      relative_bundle_sizes = [(float(sample) / total) for sample in samples]
+      bundle_ranges = []
+      start = start_position
+      index = 0
+      while start < stop_position:
+        if index == desired_num_bundles - 1:
+          bundle_ranges.append((start, stop_position))
+          break
+        stop = start + int(self._num_records * relative_bundle_sizes[index])
+        bundle_ranges.append((start, stop))
+        start = stop
+        index += 1
+    else:
+      if self._initial_splitting_num_bundles:
+        bundle_size_in_elements = max(1, self._num_records /
+                                      self._initial_splitting_num_bundles)
+      else:
+        bundle_size_in_elements = (max(
+            div_round_up(desired_bundle_size, self.element_size),
+            math.floor(math.sqrt(self._num_records))))
+      bundle_ranges = []
+      for start in range(start_position, stop_position,
+                         bundle_size_in_elements):
+        stop = min(start + bundle_size_in_elements, stop_position)
+        bundle_ranges.append((start, stop))
+
+    for start, stop in bundle_ranges:
+      yield iobase.SourceBundle(stop - start, self, start, stop)
+
+  def get_range_tracker(self, start_position, stop_position):
+    if start_position is None:
+      start_position = 0
+    if stop_position is None:
+      stop_position = self._num_records
+    tracker = range_trackers.OffsetRangeTracker(start_position, stop_position)
+    if self._dynamic_splitting == 'none':
+      tracker = range_trackers.UnsplittableRangeTracker(tracker)
+    return tracker
+
+  def read(self, range_tracker):
+    index = range_tracker.start_position()
+    while range_tracker.try_claim(index):
+      r = np.random.RandomState(index)
+
+      time.sleep(self._sleep_per_input_record_sec)
+      yield r.bytes(self._key_size), r.bytes(self._value_size)
+      index += 1
+
+  def default_output_coder(self):
+    return beam.coders.TupleCoder(
+        [beam.coders.BytesCoder(), beam.coders.BytesCoder()])
+
+
+class ShuffleBarrier(beam.PTransform):
+
+  def expand(self, pc):
+    return (pc
+            | beam.Map(rotate_key)
+            | beam.GroupByKey()
+            | 'Ungroup' >> beam.FlatMap(
+                lambda elm: [(elm[0], v) for v in elm[1]]))
+
+
+class SideInputBarrier(beam.PTransform):
+
+  def expand(self, pc):
+    return (pc
+            | beam.Map(rotate_key)
+            | beam.Map(
+                lambda elem, ignored: elem,
+                beam.pvalue.AsIter(pc | beam.FlatMap(lambda elem: None))))
+
+
+def merge_using_gbk(name, pc1, pc2):
+  """Merges two given PCollections using a CoGroupByKey."""
+
+  pc1_with_key = pc1 | (name + 'AttachKey1') >> beam.Map(lambda x: (x, x))
+  pc2_with_key = pc2 | (name + 'AttachKey2') >> beam.Map(lambda x: (x, x))
+
+  grouped = (
+      {'pc1': pc1_with_key, 'pc2': pc2_with_key} |
+      (name + 'Group') >> beam.CoGroupByKey())
+  return (grouped |
+          (name + 'DeDup') >> beam.Map(lambda elm: elm[0]))  # Ignoring values
+
+
+def merge_using_side_input(name, pc1, pc2):
+  """Merges two given PCollections using side inputs."""
+
+  def join_fn(val, _):  # Ignoring side input
+    return val
+
+  return pc1 | name >> beam.core.Map(join_fn, beam.pvalue.AsIter(pc2))
+
+
+def expand_using_gbk(name, pc):
+  """Expands a given PCollection into two copies using GroupByKey."""
+
+  ret = []
+  ret.append((pc | ('%s.a' % name) >> ShuffleBarrier()))
+  ret.append((pc | ('%s.b' % name) >> ShuffleBarrier()))
+  return ret
+
+
+def expand_using_second_output(name, pc):
+  """Expands a given PCollection into two copies using side outputs."""
+
+  class ExpandFn(beam.DoFn):
+
+    def process(self, element):
+      yield beam.pvalue.TaggedOutput('second_out', element)
+      yield element
+
+  pc1, pc2 = (pc | name >> beam.ParDo(
+      ExpandFn()).with_outputs('second_out', main='main_out'))
+  return [pc1, pc2]
+
+
+def _parse_steps(json_str):
+  """Converts the JSON step description into Python objects.
+
+  See property 'steps' for more details about the JSON step description.
+
+  Args:
+    json_str: a JSON string that describes the steps.
+
+  Returns:
+    Information about steps as a list of dictionaries. Each dictionary may have
+    following properties.
+    (1) per_element_delay - amount of delay for each element in seconds.
+    (2) per_bundle_delay - minimum amount of delay for a given step in seconds.
+    (3) output_records_per_input_record - number of output elements generated
+        for each input element to a step.
+    (4) output_filter_ratio - the probability at which a step may filter out a
+        given element by not producing any output for that element.
+  """
+  all_steps = []
+  json_data = json.loads(json_str)
+  for val in json_data:
+    steps = {}
+    steps['per_element_delay'] = (
+        (float(val['per_element_delay_msec']) / 1000)
+        if 'per_element_delay_msec' in val else 0)
+    steps['per_bundle_delay'] = (
+        float(val['per_bundle_delay_sec'])
+        if 'per_bundle_delay_sec' in val else 0)
+    steps['output_records_per_input_record'] = (
+        int(val['output_records_per_input_record'])
+        if 'output_records_per_input_record' in val else 1)
+    steps['output_filter_ratio'] = (
+        float(val['output_filter_ratio'])
+        if 'output_filter_ratio' in val else 0)
+    all_steps.append(steps)
+
+  return all_steps
+
+
+def parse_args(args):
+  """Parses a given set of arguments.
+
+  Args:
+    args: set of arguments to be passed.
+
+  Returns:
+    a tuple where first item gives the set of arguments defined and parsed
+    within this method and second item gives the set of unknown arguments.
+  """
+
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--steps',
+      dest='steps',
+      type=_parse_steps,
+      help='A JSON string that gives a list where each entry of the list is '
+           'configuration information for a step. Configuration for each step '
+           'consists of '
+           '(1) A float "per_bundle_delay_sec" (in seconds). Defaults to 0.'
+           '(2) A float "per_element_delay_msec" (in milli seconds). '
+           '    Defaults to 0.'
+           '(3) An integer "output_records_per_input_record". Defaults to 1.'
+           '(4) A float "output_filter_ratio" in the range [0, 1] . '
+           '    Defaults to 0.')
+
+  parser.add_argument(
+      '--input',
+      dest='input',
+      type=json.loads,
+      help='A JSON string that describes the properties of the SyntheticSource '
+           'used by the pipeline. Configuration is similar to Java '
+           'SyntheticBoundedInput.'
+           'Currently supports following properties. '
+           '(1) An integer "numRecords". '
+           '(2) An integer "keySize". '
+           '(3) An integer "valueSize". '
+           '(4) A tuple "bundleSizeDistribution" with following values. '
+           '    A string "type". Allowed values are "const" and "zipf". '
+           '    An float "param". Only used if "type"=="zipf". Must be '
+           '    larger than 1. '
+           '(5) An integer "forceNumInitialBundles". '
+           '(6) An integer "splitPointFrequencyRecords". '
+           '(7) A tuple "delayDistribution" with following values. '
+           '    A string "type". Only allowed value is "const". '
+           '    An integer "const". ')
+
+  parser.add_argument('--barrier',
+                      dest='barrier',
+                      default='shuffle',
+                      choices=['shuffle', 'side-input', 'expand-gbk',
+                               'expand-second-output', 'merge-gbk',
+                               'merge-side-input'],
+                      help='Whether to use shuffle as the barrier '
+                           '(as opposed to side inputs).')
+  parser.add_argument('--output',
+                      dest='output',
+                      default='',
+                      help='Destination to write output.')
+
+  return parser.parse_known_args(args)
+
+
+def run(argv=None):
+  """Runs the workflow."""
+  known_args, pipeline_args = parse_args(argv)
+
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(SetupOptions).save_main_session = True
+
+  input_info = known_args.input
+
+  with TestPipeline(options=pipeline_options) as p:
+    source = SyntheticSource(input_info)
+
+    # pylint: disable=expression-not-assigned
+    barrier = known_args.barrier
+
+    pc_list = []
+    num_roots = 2 ** (len(known_args.steps) - 1) if (
+        barrier == 'merge-gbk' or barrier == 'merge-side-input') else 1
+    for read_no in range(num_roots):
+      pc_list.append((p | ('Read %d' % read_no) >> beam.io.Read(source)))
+
+    for step_no, steps in enumerate(known_args.steps):
+      if step_no != 0:
+        new_pc_list = []
+        for pc_no, pc in enumerate(pc_list):
+          if barrier == 'shuffle':
+            new_pc_list.append(
+                (pc |
+                 ('shuffle %d.%d' % (step_no, pc_no)) >> ShuffleBarrier()))
+          elif barrier == 'side-input':
+            new_pc_list.append(
+                (pc |
+                 ('side-input %d.%d' % (step_no, pc_no)) >> SideInputBarrier()))
+          elif barrier == 'expand-gbk':
+            new_pc_list.extend(
+                expand_using_gbk(('expand-gbk %d.%d' % (step_no, pc_no)), pc))
+          elif barrier == 'expand-second-output':
+            new_pc_list.extend(
+                expand_using_second_output(
+                    ('expand-second-output %d.%d' % (step_no, pc_no)), pc))
+          elif barrier == 'merge-gbk':
+            if pc_no % 2 == 0:
+              new_pc_list.append(
+                  merge_using_gbk(('merge-gbk %d.%d' % (step_no, pc_no)),
+                                  pc, pc_list[pc_no + 1]))
+            else:
+              continue
+          elif barrier == 'merge-side-input':
+            if pc_no % 2 == 0:
+              new_pc_list.append(
+                  merge_using_side_input(
+                      ('merge-side-input %d.%d' % (step_no, pc_no)),
+                      pc, pc_list[pc_no + 1]))
+            else:
+              continue
+
+        pc_list = new_pc_list
+
+      new_pc_list = []
+      for pc_no, pc in enumerate(pc_list):
+        new_pc = pc | 'SyntheticStep %d.%d' % (step_no, pc_no) >> beam.ParDo(
+            SyntheticStep(
+                per_element_delay_sec=steps['per_element_delay'],
+                per_bundle_delay_sec=steps['per_bundle_delay'],
+                output_records_per_input_record=
+                steps['output_records_per_input_record'],
+                output_filter_ratio=
+                steps['output_filter_ratio']))
+        new_pc_list.append(new_pc)
+      pc_list = new_pc_list
+
+    if known_args.output:
+      # If an output location is provided we format and write output.
+      if len(pc_list) == 1:
+        (pc_list[0] |
+         'FormatOutput' >> beam.Map(lambda elm: (elm[0] + elm[1])) |
+         'WriteOutput' >> WriteToText(known_args.output))
+
+  logging.info('Pipeline run completed.')
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  run()
diff --git a/sdks/python/apache_beam/testing/synthetic_pipeline_test.py b/sdks/python/apache_beam/testing/synthetic_pipeline_test.py
new file mode 100644
index 0000000..fe5e94a
--- /dev/null
+++ b/sdks/python/apache_beam/testing/synthetic_pipeline_test.py
@@ -0,0 +1,154 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Tests for apache_beam.testing.synthetic_pipeline."""
+
+from __future__ import absolute_import
+
+import glob
+import json
+import logging
+import tempfile
+import time
+import unittest
+
+import apache_beam as beam
+from apache_beam.io import source_test_utils
+from apache_beam.testing import synthetic_pipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+try:
+  import numpy as np
+except ImportError:
+  np = None
+
+
+def input_spec(num_records, key_size, value_size,
+               bundle_size_distribution_type='const',
+               bundle_size_distribution_param=0,
+               force_initial_num_bundles=0):
+  return {
+      'numRecords': num_records,
+      'keySizeBytes': key_size,
+      'valueSizeBytes': value_size,
+      'bundleSizeDistribution': {'type': bundle_size_distribution_type,
+                                 'param': bundle_size_distribution_param},
+      'forceNumInitialBundles': force_initial_num_bundles,
+  }
+
+
+@unittest.skipIf(np is None, 'Synthetic source dependencies are not installed')
+class SyntheticPipelineTest(unittest.TestCase):
+
+  # pylint: disable=expression-not-assigned
+
+  def testSyntheticStep(self):
+    start = time.time()
+    with beam.Pipeline() as p:
+      pcoll = p | beam.Create(list(range(10))) | beam.ParDo(
+          synthetic_pipeline.SyntheticStep(0, 0.5, 10))
+      assert_that(
+          pcoll | beam.combiners.Count.Globally(), equal_to([100]))
+
+    elapsed = time.time() - start
+    # TODO(chamikaramj): Fix the flaky time based bounds.
+    self.assertTrue(0.5 <= elapsed <= 3, elapsed)
+
+  def testSyntheticSource(self):
+    def assert_size(element, expected_size):
+      assert len(element) == expected_size
+    with beam.Pipeline() as p:
+      pcoll = (
+          p | beam.io.Read(
+              synthetic_pipeline.SyntheticSource(input_spec(300, 5, 15))))
+      (pcoll
+       | beam.Map(lambda elm: elm[0]) | 'key' >> beam.Map(assert_size, 5))
+      (pcoll
+       | beam.Map(lambda elm: elm[1]) | 'value' >> beam.Map(assert_size, 15))
+      assert_that(pcoll | beam.combiners.Count.Globally(),
+                  equal_to([300]))
+
+  def testSyntheticSourceSplitEven(self):
+    source = synthetic_pipeline.SyntheticSource(
+        input_spec(1000, 1, 1, 'const', 0))
+    splits = source.split(100)
+    sources_info = [(split.source, split.start_position, split.stop_position)
+                    for split in splits]
+    self.assertEquals(20, len(sources_info))
+    source_test_utils.assert_sources_equal_reference_source(
+        (source, None, None), sources_info)
+
+  def testSyntheticSourceSplitUneven(self):
+    source = synthetic_pipeline.SyntheticSource(
+        input_spec(1000, 1, 1, 'zipf', 3, 10))
+    splits = source.split(100)
+    sources_info = [(split.source, split.start_position, split.stop_position)
+                    for split in splits]
+    self.assertEquals(10, len(sources_info))
+    source_test_utils.assert_sources_equal_reference_source(
+        (source, None, None), sources_info)
+
+  def testSplitAtFraction(self):
+    source = synthetic_pipeline.SyntheticSource(input_spec(10, 1, 1))
+    source_test_utils.assert_split_at_fraction_exhaustive(source)
+    source_test_utils.assert_split_at_fraction_fails(source, 5, 0.3)
+    source_test_utils.assert_split_at_fraction_succeeds_and_consistent(
+        source, 1, 0.3)
+
+  def run_pipeline(self, barrier, writes_output=True):
+    steps = [{'per_element_delay': 1}, {'per_element_delay': 1}]
+    args = ['--barrier=%s' % barrier, '--runner=DirectRunner',
+            '--steps=%s' % json.dumps(steps),
+            '--input=%s' % json.dumps(input_spec(10, 1, 1))]
+    if writes_output:
+      output_location = tempfile.NamedTemporaryFile().name
+      args.append('--output=%s' % output_location)
+
+    synthetic_pipeline.run(args)
+
+    # Verify output
+    if writes_output:
+      read_output = []
+      for file_name in glob.glob(output_location + '*'):
+        with open(file_name, 'r') as f:
+          read_output.extend(f.read().splitlines())
+
+      self.assertEqual(10, len(read_output))
+
+  def testPipelineShuffle(self):
+    self.run_pipeline('shuffle')
+
+  def testPipelineSideInput(self):
+    self.run_pipeline('side-input')
+
+  def testPipelineExpandGBK(self):
+    self.run_pipeline('expand-gbk', False)
+
+  def testPipelineExpandSideOutput(self):
+    self.run_pipeline('expand-second-output', False)
+
+  def testPipelineMergeGBK(self):
+    self.run_pipeline('merge-gbk')
+
+  def testPipelineMergeSideInput(self):
+    self.run_pipeline('merge-side-input')
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()
diff --git a/sdks/python/scripts/generate_pydoc.sh b/sdks/python/scripts/generate_pydoc.sh
index 0e5bfbb..2f6f0f1 100755
--- a/sdks/python/scripts/generate_pydoc.sh
+++ b/sdks/python/scripts/generate_pydoc.sh
@@ -178,6 +178,7 @@ nitpicky = True
 nitpick_ignore = []
 nitpick_ignore += [('py:class', iden) for iden in ignore_identifiers]
 nitpick_ignore += [('py:obj', iden) for iden in ignore_identifiers]
+nitpick_ignore += [('py:exc', 'ValueError')]
 EOF
 
 #=== index.rst ===#
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index 46c8050..43df52b 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -128,6 +128,10 @@ REQUIRED_TEST_PACKAGES = [
     'pyhamcrest>=1.9,<2.0',
     ]
 
+REQUIRED_PERF_TEST_PACKAGES = [
+    'numpy>=1.14.3',
+]
+
 GCP_REQUIREMENTS = [
     # oauth2client >=4 only works with google-apitools>=0.5.18.
     'google-apitools>=0.5.18,<=0.5.20',
@@ -192,7 +196,8 @@ setuptools.setup(
     extras_require={
         'docs': ['Sphinx>=1.5.2,<2.0'],
         'test': REQUIRED_TEST_PACKAGES,
-        'gcp': GCP_REQUIREMENTS
+        'gcp': GCP_REQUIREMENTS,
+        'perftest': REQUIRED_PERF_TEST_PACKAGES,
     },
     zip_safe=False,
     # PyPI package information.
diff --git a/settings.gradle b/settings.gradle
index 90bf3f9..a8ed673 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -152,6 +152,8 @@ include "beam-sdks-java-io-tika"
 project(":beam-sdks-java-io-tika").dir = file("sdks/java/io/tika")
 include "beam-sdks-java-io-xml"
 project(":beam-sdks-java-io-xml").dir = file("sdks/java/io/xml")
+include "beam-sdks-java-io-synthetic"
+project(":beam-sdks-java-io-synthetic").dir = file("sdks/java/io/synthetic")
 include "beam-sdks-java-javadoc"
 project(":beam-sdks-java-javadoc").dir = file("sdks/java/javadoc")
 include "beam-sdks-java-maven-archetypes-examples"