You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2018/07/15 14:56:39 UTC
[beam] branch master updated: [BEAM-4432] Adding Sources to produce
Synthetic output for Batch pipelines (#5519)
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new b5e8335 [BEAM-4432] Adding Sources to produce Synthetic output for Batch pipelines (#5519)
b5e8335 is described below
commit b5e8335d982ee69d9f788f65f27356cddd5293d1
Author: Pablo <pa...@users.noreply.github.com>
AuthorDate: Sun Jul 15 07:56:33 2018 -0700
[BEAM-4432] Adding Sources to produce Synthetic output for Batch pipelines (#5519)
* Adding Sources to produce Synthetic output for Batch pipelines
---
.../org/apache/beam/gradle/BeamModulePlugin.groovy | 1 +
sdks/java/io/synthetic/build.gradle | 36 ++
.../beam/sdk/io/synthetic/SyntheticBoundedIO.java | 437 ++++++++++++++++++
.../beam/sdk/io/synthetic/SyntheticOptions.java | 355 +++++++++++++++
.../beam/sdk/io/synthetic/SyntheticUtils.java | 100 ++++
.../apache/beam/sdk/io/synthetic/package-info.java | 19 +
.../sdk/io/synthetic/SyntheticBoundedIOTest.java | 215 +++++++++
.../apache_beam/testing/synthetic_pipeline.py | 502 +++++++++++++++++++++
.../apache_beam/testing/synthetic_pipeline_test.py | 154 +++++++
sdks/python/scripts/generate_pydoc.sh | 1 +
sdks/python/setup.py | 7 +-
settings.gradle | 2 +
12 files changed, 1828 insertions(+), 1 deletion(-)
diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index da0a875..c92f23c 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -341,6 +341,7 @@ class BeamModulePlugin implements Plugin<Project> {
commons_io_1x : "commons-io:commons-io:1.3.2",
commons_io_2x : "commons-io:commons-io:2.5",
commons_lang3 : "org.apache.commons:commons-lang3:3.6",
+ commons_math3 : "org.apache.commons:commons-math3:3.6.1",
datastore_v1_proto_client : "com.google.cloud.datastore:datastore-v1-proto-client:1.4.0",
datastore_v1_protos : "com.google.cloud.datastore:datastore-v1-protos:1.3.0",
error_prone_annotations : "com.google.errorprone:error_prone_annotations:2.0.15",
diff --git a/sdks/java/io/synthetic/build.gradle b/sdks/java/io/synthetic/build.gradle
new file mode 100644
index 0000000..3d6be8e
--- /dev/null
+++ b/sdks/java/io/synthetic/build.gradle
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+apply plugin: org.apache.beam.gradle.BeamModulePlugin
+applyJavaNature()
+
+description = "Apache Beam :: SDKs :: Java :: IO :: Synthetic"
+ext.summary = "Generators of Synthetic IO for Testing."
+
+dependencies {
+ compile library.java.joda_time
+ compile library.java.commons_math3
+ shadow library.java.jackson_core
+ shadow library.java.jackson_annotations
+ shadow library.java.jackson_databind
+ testCompile library.java.guava
+ testCompile library.java.junit
+ testCompile library.java.hamcrest_core
+ testCompile library.java.hamcrest_library
+ shadow project(path: ":beam-sdks-java-core", configuration: "shadow")
+}
diff --git a/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticBoundedIO.java b/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticBoundedIO.java
new file mode 100644
index 0000000..d9f652a
--- /dev/null
+++ b/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticBoundedIO.java
@@ -0,0 +1,437 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.synthetic;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.commons.math3.stat.StatUtils.sum;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.google.common.base.MoreObjects;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Random;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.io.OffsetBasedSource;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.commons.math3.distribution.ConstantRealDistribution;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This {@link SyntheticBoundedIO} class provides a parameterizable batch custom source that is
+ * deterministic.
+ *
+ * <p>The {@link SyntheticBoundedSource} generates a {@link PCollection} of {@code KV<byte[],
+ * byte[]>}. A fraction of the generated records {@code KV<byte[], byte[]>} are associated with
+ * "hot" keys, which are uniformly distributed over a fixed number of hot keys. The remaining
+ * generated records are associated with "random" keys. Each record will be slowed down by a certain
+ * sleep time generated based on the specified sleep time distribution when the {@link
+ * SyntheticSourceReader} reads each record. The record {@code KV<byte[], byte[]>} is generated
+ * deterministically based on the record's position in the source, which enables repeatable
+ * execution for debugging. The SyntheticBoundedInput configurable parameters are defined in {@link
+ * SyntheticBoundedIO.SyntheticSourceOptions}.
+ *
+ * <p>To read a {@link PCollection} of {@code KV<byte[], byte[]>} from {@link SyntheticBoundedIO},
+ * use {@link SyntheticBoundedIO#readFrom} to construct the synthetic source with synthetic source
+ * options. See {@link SyntheticBoundedIO.SyntheticSourceOptions} for how to construct an instance.
+ * An example is below:
+ *
+ * <pre>{@code
+ * Pipeline p = ...;
+ * SyntheticBoundedInput.SourceOptions sso = ...;
+ *
+ * // Construct the synthetic input with synthetic source options.
+ * PCollection<KV<byte[], byte[]>> input = p.apply(SyntheticBoundedInput.readFrom(sso));
+ * }</pre>
+ */
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public class SyntheticBoundedIO {
+ /** Read from the synthetic source options. */
+ public static Read.Bounded<KV<byte[], byte[]>> readFrom(SyntheticSourceOptions options) {
+ checkNotNull(options, "Input synthetic source options should not be null.");
+ return Read.from(new SyntheticBoundedSource(options));
+ }
+
+ /** A {@link SyntheticBoundedSource} that reads {@code KV<byte[], byte[]>}. */
+ public static class SyntheticBoundedSource extends OffsetBasedSource<KV<byte[], byte[]>> {
+ private static final long serialVersionUID = 0;
+ private static final Logger LOG = LoggerFactory.getLogger(SyntheticBoundedSource.class);
+
+ private final SyntheticSourceOptions sourceOptions;
+
+ public SyntheticBoundedSource(SyntheticSourceOptions sourceOptions) {
+ this(0, sourceOptions.numRecords, sourceOptions);
+ }
+
+ SyntheticBoundedSource(long startOffset, long endOffset, SyntheticSourceOptions sourceOptions) {
+ super(startOffset, endOffset, 1);
+ this.sourceOptions = sourceOptions;
+ LOG.debug("Constructing {}", toString());
+ }
+
+ @Override
+ public Coder<KV<byte[], byte[]>> getDefaultOutputCoder() {
+ return KvCoder.of(ByteArrayCoder.of(), ByteArrayCoder.of());
+ }
+
+ @Override
+ // TODO: test cases where the source size could not be estimated (i.e., return 0).
+ // TODO: test cases where the key size and value size might differ from record to record.
+ // The key size and value size might have their own distributions.
+ public long getBytesPerOffset() {
+ return sourceOptions.bytesPerRecord >= 0
+ ? sourceOptions.bytesPerRecord
+ : sourceOptions.keySizeBytes + sourceOptions.valueSizeBytes;
+ }
+
+ @Override
+ public void validate() {
+ super.validate();
+ sourceOptions.validate();
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("options", sourceOptions)
+ .add("offsetRange", "[" + getStartOffset() + ", " + getEndOffset() + ")")
+ .toString();
+ }
+
+ @Override
+ public final SyntheticBoundedSource createSourceForSubrange(long start, long end) {
+ checkArgument(
+ start >= getStartOffset(),
+ "Start offset value "
+ + start
+ + " of the subrange cannot be smaller than the start offset value "
+ + getStartOffset()
+ + " of the parent source");
+ checkArgument(
+ end <= getEndOffset(),
+ "End offset value "
+ + end
+ + " of the subrange cannot be larger than the end offset value "
+ + getEndOffset()
+ + " of the parent source");
+
+ return new SyntheticBoundedSource(start, end, sourceOptions);
+ }
+
+ @Override
+ public long getMaxEndOffset(PipelineOptions options) {
+ return getEndOffset();
+ }
+
+ @Override
+ public SyntheticSourceReader createReader(PipelineOptions pipelineOptions) {
+ return new SyntheticSourceReader(this);
+ }
+
+ @Override
+ public List<SyntheticBoundedSource> split(long desiredBundleSizeBytes, PipelineOptions options)
+ throws Exception {
+ // Choose number of bundles either based on explicit parameter,
+ // or based on size and hints.
+ int desiredNumBundles =
+ (sourceOptions.forceNumInitialBundles == null)
+ ? ((int) Math.ceil(1.0 * getEstimatedSizeBytes(options) / desiredBundleSizeBytes))
+ : sourceOptions.forceNumInitialBundles;
+
+ List<SyntheticBoundedSource> res =
+ generateBundleSizes(desiredNumBundles)
+ .stream()
+ .map(
+ offsetRange ->
+ createSourceForSubrange(offsetRange.getFrom(), offsetRange.getTo()))
+ .collect(Collectors.toList());
+ LOG.info("Split into {} bundles of sizes: {}", res.size(), res);
+ return res;
+ }
+
+ private List<OffsetRange> generateBundleSizes(int desiredNumBundles) {
+ List<OffsetRange> result = new ArrayList<>();
+
+ // Generate relative bundle sizes using the given distribution.
+ double[] relativeSizes = new double[desiredNumBundles];
+ for (int i = 0; i < relativeSizes.length; ++i) {
+ relativeSizes[i] =
+ sourceOptions.bundleSizeDistribution.sample(
+ sourceOptions.hashFunction.hashInt(i).asLong());
+ }
+
+ // Generate offset ranges proportional to the relative sizes.
+ double s = sum(relativeSizes);
+ long startOffset = getStartOffset();
+ double sizeSoFar = 0;
+ for (int i = 0; i < relativeSizes.length; ++i) {
+ sizeSoFar += relativeSizes[i];
+ long endOffset =
+ (i == relativeSizes.length - 1)
+ ? getEndOffset()
+ : (long) (getStartOffset() + sizeSoFar * (getEndOffset() - getStartOffset()) / s);
+ if (startOffset != endOffset) {
+ result.add(new OffsetRange(startOffset, endOffset));
+ }
+ startOffset = endOffset;
+ }
+ return result;
+ }
+ }
+
+ /**
+ * Shape of the progress reporting curve as a function of the current offset in the {@link
+ * SyntheticBoundedSource}.
+ */
+ public enum ProgressShape {
+ /** Reported progress grows linearly from 0 to 1. */
+ LINEAR,
+ /** Reported progress decreases linearly from 0.9 to 0.1. */
+ LINEAR_REGRESSING,
+ }
+
+ /**
+ * Synthetic bounded source options. These options are all JSON, see documentations of individual
+ * fields for details. {@code SyntheticSourceOptions} uses jackson annotations which
+ * PipelineOptionsFactory can use to parse and construct an instance.
+ */
+ public static class SyntheticSourceOptions extends SyntheticOptions {
+ private static final long serialVersionUID = 0;
+
+ /** Total number of generated records. */
+ @JsonProperty public long numRecords;
+
+ /**
+ * Only records whose index is a multiple of this will be split points. 0 means the source is
+ * not dynamically splittable (but is perfectly statically splittable). In that case it also
+ * doesn't report progress at all.
+ */
+ @JsonProperty public long splitPointFrequencyRecords = 1;
+
+ /**
+ * Distribution for generating initial split bundles.
+ *
+ * <p>When splitting into "desiredBundleSizeBytes", we'll compute the desired number of bundles
+ * N, then sample this many numbers from this distribution, normalize their sum to 1, and use
+ * that as the boundaries of generated bundles.
+ *
+ * <p>The Zipf distribution is expected to be particularly useful here.
+ *
+ * <p>E.g., empirically, with 100 bundles, the Zipf distribution with a parameter of 3.5 will
+ * generate bundles where the largest is about 3x-10x larger than the median; with a parameter
+ * of 3.0 this ratio will be about 5x-50x; with 2.5, 5x-100x (i.e. 1 bundle can be as large as
+ * all others combined).
+ */
+ @JsonDeserialize(using = SamplerDeserializer.class)
+ public Sampler bundleSizeDistribution = fromRealDistribution(new ConstantRealDistribution(1));
+
+ /**
+ * If specified, this source will split into exactly this many bundles regardless of the hints
+ * provided by the service.
+ */
+ @JsonProperty public Integer forceNumInitialBundles;
+
+ /** See {@link ProgressShape}. */
+ @JsonProperty public ProgressShape progressShape = ProgressShape.LINEAR;
+
+ /**
+ * The distribution for the delay when reading from synthetic source starts. This delay is
+ * independent of the per-record delay and uses the same types of distributions as {@link
+ * #delayDistribution}.
+ */
+ @JsonDeserialize(using = SamplerDeserializer.class)
+ final Sampler initializeDelayDistribution =
+ fromRealDistribution(new ConstantRealDistribution(0));
+
+ /**
+ * Generates a random delay value for the synthetic source initialization using the distribution
+ * defined by {@link #initializeDelayDistribution}.
+ */
+ Duration nextInitializeDelay(long seed) {
+ return Duration.millis((long) initializeDelayDistribution.sample(seed));
+ }
+
+ @Override
+ public void validate() {
+ super.validate();
+ checkArgument(
+ numRecords >= 0, "numRecords should be a non-negative number, but found %s.", numRecords);
+ checkNotNull(bundleSizeDistribution, "bundleSizeDistribution");
+ checkArgument(
+ forceNumInitialBundles == null || forceNumInitialBundles > 0,
+ "forceNumInitialBundles, if specified, must be positive, but found %s",
+ forceNumInitialBundles);
+ checkArgument(
+ splitPointFrequencyRecords >= 0,
+ "splitPointFrequencyRecords must be non-negative, but found %s",
+ splitPointFrequencyRecords);
+ }
+
+ public Record genRecord(long position) {
+ // This method is supposed to generate random records deterministically,
+ // so that results can be reproduced by running the same scenario a second time.
+ // We need to initiate a Random object for each position to make the record deterministic
+ // because liquid sharding could split the Source at any position.
+ // And we also need a seed to initiate a Random object. The mapping from the position to
+ // the seed should be fixed. Using the position as seed to feed Random objects will cause the
+ // generated values to not be random enough because the position values are
+ // close to each other. To make seeds fed into the Random objects unrelated,
+ // we use a hashing function to map the position to its corresponding hashcode,
+ // and use the hashcode as a seed to feed into the Random object.
+ long hashCodeOfPosition = hashFunction.hashLong(position).asLong();
+ return new Record(genKvPair(hashCodeOfPosition), nextDelay(hashCodeOfPosition));
+ }
+
+ /** Record generated by {@link #genRecord}. */
+ public static class Record {
+ public final KV<byte[], byte[]> kv;
+ public final Duration sleepMsec;
+
+ Record(KV<byte[], byte[]> kv, long sleepMsec) {
+ this.kv = kv;
+ this.sleepMsec = new Duration(sleepMsec);
+ }
+ }
+ }
+
+ /**
+ * A reader over the {@link PCollection} of {@code KV<byte[], byte[]>} from the synthetic source.
+ *
+ * <p>The random but deterministic record at position "i" in the range [A, B) is generated by
+ * using {@link SyntheticSourceOptions#genRecord}. Reading each record sleeps according to the
+ * sleep time distribution in {@code SyntheticOptions}.
+ */
+ private static class SyntheticSourceReader
+ extends OffsetBasedSource.OffsetBasedReader<KV<byte[], byte[]>> {
+ private final long splitPointFrequencyRecords;
+
+ private KV<byte[], byte[]> currentKvPair;
+ private long currentOffset;
+ private boolean isAtSplitPoint;
+
+ SyntheticSourceReader(SyntheticBoundedSource source) {
+ super(source);
+ this.currentKvPair = null;
+ this.splitPointFrequencyRecords = source.sourceOptions.splitPointFrequencyRecords;
+ }
+
+ @Override
+ public synchronized SyntheticBoundedSource getCurrentSource() {
+ return (SyntheticBoundedSource) super.getCurrentSource();
+ }
+
+ @Override
+ protected long getCurrentOffset() throws IllegalStateException {
+ return currentOffset;
+ }
+
+ @Override
+ public KV<byte[], byte[]> getCurrent() throws NoSuchElementException {
+ if (currentKvPair == null) {
+ throw new NoSuchElementException(
+ "The current element is unavailable because either the reader is "
+ + "at the beginning of the input and start() or advance() wasn't called, "
+ + "or the last start() or advance() returned false.");
+ }
+ return currentKvPair;
+ }
+
+ @Override
+ public boolean allowsDynamicSplitting() {
+ return splitPointFrequencyRecords > 0;
+ }
+
+ @Override
+ protected final boolean startImpl() throws IOException {
+ this.currentOffset = getCurrentSource().getStartOffset();
+ if (splitPointFrequencyRecords > 0) {
+ while (currentOffset % splitPointFrequencyRecords != 0) {
+ ++currentOffset;
+ }
+ }
+
+ SyntheticSourceOptions options = getCurrentSource().sourceOptions;
+ SyntheticUtils.delay(
+ options.nextInitializeDelay(this.currentOffset),
+ options.cpuUtilizationInMixedDelay,
+ options.delayType,
+ new Random(this.currentOffset));
+
+ isAtSplitPoint = true;
+ --currentOffset;
+ return advanceImpl();
+ }
+
+ @Override
+ protected boolean advanceImpl() {
+ currentOffset++;
+ isAtSplitPoint =
+ (splitPointFrequencyRecords == 0) || (currentOffset % splitPointFrequencyRecords == 0);
+
+ SyntheticSourceOptions options = getCurrentSource().sourceOptions;
+ SyntheticSourceOptions.Record record = options.genRecord(currentOffset);
+ currentKvPair = record.kv;
+ // TODO: add a separate distribution for the sleep time of reading the first record
+ // (e.g.,"open" the files).
+ long hashCodeOfVal = options.hashFunction.hashBytes(currentKvPair.getValue()).asLong();
+ Random random = new Random(hashCodeOfVal);
+ SyntheticUtils.delay(
+ record.sleepMsec, options.cpuUtilizationInMixedDelay, options.delayType, random);
+
+ return true;
+ }
+
+ @Override
+ public void close() {
+ // Nothing
+ }
+
+ @Override
+ public Double getFractionConsumed() {
+ double realFractionConsumed = super.getFractionConsumed();
+ ProgressShape shape = getCurrentSource().sourceOptions.progressShape;
+ switch (shape) {
+ case LINEAR:
+ return realFractionConsumed;
+ case LINEAR_REGRESSING:
+ return 0.9 - 0.8 * realFractionConsumed;
+ default:
+ throw new AssertionError("Unexpected progress shape: " + shape);
+ }
+ }
+
+ @Override
+ protected boolean isAtSplitPoint() throws NoSuchElementException {
+ return isAtSplitPoint;
+ }
+ }
+}
diff --git a/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticOptions.java b/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticOptions.java
new file mode 100644
index 0000000..b74fc06
--- /dev/null
+++ b/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticOptions.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.synthetic;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.Random;
+import org.apache.beam.sdk.values.KV;
+import org.apache.commons.math3.distribution.ConstantRealDistribution;
+import org.apache.commons.math3.distribution.ExponentialDistribution;
+import org.apache.commons.math3.distribution.IntegerDistribution;
+import org.apache.commons.math3.distribution.NormalDistribution;
+import org.apache.commons.math3.distribution.RealDistribution;
+import org.apache.commons.math3.distribution.UniformRealDistribution;
+import org.apache.commons.math3.distribution.ZipfDistribution;
+
+/**
+ * This {@link SyntheticOptions} class provides common parameterizable synthetic options that are
+ * used by {@link SyntheticBoundedIO}.
+ */
+public class SyntheticOptions implements Serializable {
+ private static final long serialVersionUID = 0;
+
+ /**
+ * The type of Delay that will be produced.
+ *
+ * <p>CPU delay produces a CPU-busy delay. SLEEP delay makes the process sleep.
+ */
+ public enum DelayType {
+ SLEEP,
+ CPU,
+ MIXED,
+ }
+
+ /** Mapper for (de)serializing JSON. */
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ /**
+ * Wrapper over a distribution. Unfortunately commons-math does not provide a common interface
+ * over both RealDistribution and IntegerDistribution, and we sometimes need one and sometimes the
+ * other.
+ */
+ public interface Sampler extends Serializable {
+ double sample(long seed);
+
+ // Make this class a bean, so Jackson can serialize it during SyntheticOptions.toString().
+ Object getDistribution();
+ }
+
+ public static Sampler fromRealDistribution(final RealDistribution dist) {
+ return new Sampler() {
+ private static final long serialVersionUID = 0L;
+
+ @Override
+ public double sample(long seed) {
+ dist.reseedRandomGenerator(seed);
+ return dist.sample();
+ }
+
+ @Override
+ public Object getDistribution() {
+ return dist;
+ }
+ };
+ }
+
+ public static Sampler fromIntegerDistribution(final IntegerDistribution dist) {
+ return new Sampler() {
+ private static final long serialVersionUID = 0L;
+
+ @Override
+ public double sample(long seed) {
+ dist.reseedRandomGenerator(seed);
+ return dist.sample();
+ }
+
+ @Override
+ public Object getDistribution() {
+ return dist;
+ }
+ };
+ }
+
+ private static Sampler scaledSampler(final Sampler sampler, final double multiplier) {
+ return new Sampler() {
+ private static final long serialVersionUID = 0L;
+
+ @Override
+ public double sample(long seed) {
+ return multiplier * sampler.sample(seed);
+ }
+
+ @Override
+ public Object getDistribution() {
+ return sampler.getDistribution();
+ }
+ };
+ }
+
+ /** The key size in bytes. */
+ @JsonProperty public long keySizeBytes = 1;
+
+ /** The value size in bytes. */
+ @JsonProperty public long valueSizeBytes = 1;
+
+ /**
+ * The size of a single record used for size estimation in bytes. If less than zero, keySizeBytes
+ * + valueSizeBytes is used.
+ */
+ @JsonProperty public final long bytesPerRecord;
+
+ /** The number of distinct "hot" keys. */
+ @JsonProperty public long numHotKeys;
+
+ /**
+ * The fraction of records associated with "hot" keys, which are uniformly distributed over a
+ * fixed number of hot keys.
+ */
+ @JsonProperty public double hotKeyFraction;
+
+ /** The fraction of keys that should be larger than others. */
+ @JsonProperty public double largeKeyFraction = 0.0;
+
+ /** The size of large keys. */
+ @JsonProperty public double largeKeySizeBytes = 1000;
+
+ /** The seed is used for generating a hash function implementing the 128-bit murmur3 algorithm. */
+ @JsonIgnore public int seed;
+
+ /**
+ * The hash function is used to generate seeds that are fed into the random number generators and
+ * the sleep time distributions.
+ */
+ @JsonIgnore public transient HashFunction hashFunction;
+
+ /**
+ * SyntheticOptions supports several delay distributions including uniform, normal, exponential,
+ * and constant delay per record. The delay is either sleep or CPU spinning for the duration.
+ *
+ * <ul>
+ * <li>The uniform delay distribution is specified through
+ * "delayDistribution":{"type":"uniform","lower":lower_bound,"upper":upper_bound}, where
+ * lower_bound and upper_bound are non-negative numbers representing the delay range in
+ * milliseconds.
+ * <li>The normal delay distribution is specified through
+ * "delayDistribution":{"type":"normal","mean":mean,"stddev":stddev}, where mean is a
+ * non-negative number representing the mean of this normal distributed delay in
+ * milliseconds and stddev is a positive number representing its standard deviation.
+ * <li>The exponential delay distribution is specified through
+ * "delayDistribution":{"type":"exp","mean":mean}, where mean is a positive number
+ * representing the mean of this exponentially distributed delay in milliseconds.
+ * <li>The zipf distribution is specified through
+ * "delayDistribution":{"type":"zipf","param":param,"multiplier":multiplier}, where param is
+ * a number > 1 and multiplier just scales the output of the distribution. By default, the
+ * multiplier is 1. Parameters closer to 1 produce dramatically more skewed results. E.g.
+ * given 100 samples, the min will almost always be 1, while max with param 3 will usually
+ * be below 10; with param 2 max will usually be between several dozen and several hundred;
+ * with param 1.5, thousands to millions.
+ * <li>The constant sleep time per record is specified through
+ * "delayDistribution":{"type":"const","const":const} where const is a non-negative number
+ * representing the constant sleep time in milliseconds.
+ * </ul>
+ *
+ * <p>The field delayDistribution is not used in the synthetic unbounded source. The synthetic
+ * unbounded source uses RateLimiter to control QPS.
+ */
+ @JsonDeserialize(using = SamplerDeserializer.class)
+ private final Sampler delayDistribution = fromRealDistribution(new ConstantRealDistribution(0));
+
+ /**
+ * When 'delayDistribution' is configured, this indicates how the delay enforced ("SLEEP", "CPU",
+ * or "MIXED").
+ */
+ @JsonProperty public final DelayType delayType = DelayType.SLEEP;
+
+ /**
+ * CPU utilization when delayType is 'MIXED'. This determines the fraction of processing time
+ * spent spinning. The remaining time is spent sleeping. For each millisecond of processing time
+ * we choose to spin with probability equal to this fraction.
+ */
+ @JsonProperty public final double cpuUtilizationInMixedDelay;
+
+ SyntheticOptions() {
+ cpuUtilizationInMixedDelay = 0.1;
+ bytesPerRecord = -1;
+ }
+
+ @JsonDeserialize
+ public void setSeed(int seed) {
+ this.seed = seed;
+ this.hashFunction = Hashing.murmur3_128(seed);
+ }
+
+ static class SamplerDeserializer extends JsonDeserializer<Sampler> {
+ @Override
+ public Sampler deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException {
+ JsonNode node = jp.getCodec().readTree(jp);
+ String type = node.get("type").asText();
+ switch (type) {
+ case "uniform":
+ {
+ double lowerBound = node.get("lower").asDouble();
+ double upperBound = node.get("upper").asDouble();
+ checkArgument(
+ lowerBound >= 0,
+ "The lower bound of uniform distribution should be a non-negative number, "
+ + "but found %s.",
+ lowerBound);
+ return fromRealDistribution(new UniformRealDistribution(lowerBound, upperBound));
+ }
+ case "exp":
+ {
+ double mean = node.get("mean").asDouble();
+ return fromRealDistribution(new ExponentialDistribution(mean));
+ }
+ case "normal":
+ {
+ double mean = node.get("mean").asDouble();
+ double stddev = node.get("stddev").asDouble();
+ checkArgument(
+ mean >= 0,
+ "The mean of normal distribution should be a non-negative number, but found %s.",
+ mean);
+ return fromRealDistribution(new NormalDistribution(mean, stddev));
+ }
+ case "const":
+ {
+ double constant = node.get("const").asDouble();
+ checkArgument(
+ constant >= 0,
+ "The value of constant distribution should be a non-negative number, but found %s.",
+ constant);
+ return fromRealDistribution(new ConstantRealDistribution(constant));
+ }
+ case "zipf":
+ {
+ double param = node.get("param").asDouble();
+ final double multiplier =
+ node.has("multiplier") ? node.get("multiplier").asDouble() : 1.0;
+ checkArgument(
+ param > 1,
+ "The parameter of the Zipf distribution should be > 1, but found %s.",
+ param);
+ checkArgument(
+ multiplier >= 0,
+ "The multiplier of the Zipf distribution should be >= 0, but found %s.",
+ multiplier);
+ final ZipfDistribution dist = new ZipfDistribution(100, param);
+ return scaledSampler(fromIntegerDistribution(dist), multiplier);
+ }
+ default:
+ {
+ throw new IllegalArgumentException("Unknown distribution type: " + type);
+ }
+ }
+ }
+ }
+
+ public void validate() {
+ checkArgument(
+ keySizeBytes > 0, "keySizeBytes should be a positive number, but found %s", keySizeBytes);
+ checkArgument(
+ valueSizeBytes >= 0,
+ "valueSizeBytes should be a non-negative number, but found %s",
+ valueSizeBytes);
+ checkArgument(
+ numHotKeys >= 0, "numHotKeys should be a non-negative number, but found %s", numHotKeys);
+ checkArgument(
+ hotKeyFraction >= 0,
+ "hotKeyFraction should be a non-negative number, but found %s",
+ hotKeyFraction);
+ checkArgument(hashFunction != null, "hashFunction hasn't been initialized.");
+ if (hotKeyFraction > 0) {
+ int intBytes = Integer.SIZE / 8;
+ checkArgument(
+ keySizeBytes >= intBytes,
+ "Allowing hot keys (hotKeyFraction=%s) requires keySizeBytes "
+ + "to be at least %s, but found %s",
+ hotKeyFraction,
+ intBytes,
+ keySizeBytes);
+ }
+ }
+
+ @Override
+ public String toString() {
+ try {
+ return MAPPER.writeValueAsString(this);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public long nextDelay(long seed) {
+ return (long) delayDistribution.sample(seed);
+ }
+
+ public KV<byte[], byte[]> genKvPair(long seed) {
+ Random random = new Random(seed);
+
+ byte[] key = new byte[(int) keySizeBytes];
+ // Set the user-key to contain characters other than ordered-code escape characters
+ // (specifically '\0' or '\xff'). The user-key is encoded into the shuffle-key using
+ // ordered-code, and the shuffle-key is then checked for size limit violations. A user-key
+ // consisting of '\0' keySizeBytes would produce a shuffle-key encoding double in size,
+ // which would go over the shuffle-key limit (see b/28770924).
+ for (int i = 0; i < keySizeBytes; ++i) {
+ key[i] = 42;
+ }
+ // Determines whether to generate hot key or not.
+ if (random.nextDouble() < hotKeyFraction) {
+ // Generate hot key.
+ // An integer is randomly selected from the range [0, numHotKeys-1] with equal probability.
+ int randInt = random.nextInt((int) numHotKeys);
+ ByteBuffer.wrap(key).putInt(hashFunction.hashInt(randInt).asInt());
+ } else {
+ // Note that the random generated key might be a hot key.
+ // But the probability of being a hot key is very small.
+ random.nextBytes(key);
+ }
+
+ byte[] val = new byte[(int) valueSizeBytes];
+ random.nextBytes(val);
+ return KV.of(key, val);
+ }
+}
diff --git a/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticUtils.java b/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticUtils.java
new file mode 100644
index 0000000..507b962
--- /dev/null
+++ b/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticUtils.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.synthetic;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.hash.Hashing;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import org.joda.time.Duration;
+
+/** Utility functions used in {@link org.apache.beam.sdk.io.synthetic}. */
+class SyntheticUtils {
+ // cpu delay implementation:
+
+ private static final long MASK = (1L << 16) - 1L;
+ private static final long HASH = 0x243F6A8885A308D3L;
+ private static final long INIT_PLAINTEXT = 50000L;
+
+ /** Keep cpu busy for {@code delayMillis} by calculating lots of hashes. */
+ private static void cpuDelay(long delayMillis) {
+ // Note that the delay is enforced in terms of walltime. That implies this thread may not
+ // keep CPU busy if it gets preempted by other threads. There is more of chance of this
+ // occurring in a streaming pipeline as there could be lots of threads running this. The loop
+ // measures cpu time spent for each iteration, so that these effects are some what minimized.
+
+ long cpuMicros = delayMillis * 1000;
+ Stopwatch timer = Stopwatch.createUnstarted();
+
+ while (timer.elapsed(TimeUnit.MICROSECONDS) < cpuMicros) {
+ // Find a long which hashes to HASH in lowest MASK bits.
+ // Values chosen to roughly take 1ms on typical workstation.
+ timer.start();
+ long p = INIT_PLAINTEXT;
+ while (true) {
+ long t = Hashing.murmur3_128().hashLong(p).asLong();
+ if ((t & MASK) == (HASH & MASK)) {
+ break;
+ }
+ p++;
+ }
+ timer.stop();
+ }
+ }
+
+ /**
+ * Implements a mechanism to delay a thread in various fashions. * {@code CPU}: Burn CPU while
+ * waiting. * {@code SLEEP}: Sleep uninterruptibly while waiting. * {@code MIXED}: Switch between
+ * burning CPU and sleeping every millisecond to emulate a desired CPU utilization specified by
+ * {@code cpuUtilizationInMixedDelay}.
+ *
+ * @return Millis spent sleeping, does not include time spent spinning.
+ */
+ static long delay(
+ Duration delay,
+ double cpuUtilizationInMixedDelay,
+ SyntheticOptions.DelayType delayType,
+ Random rnd) {
+ switch (delayType) {
+ case CPU:
+ cpuDelay(delay.getMillis());
+ return 0;
+ case SLEEP:
+ Uninterruptibles.sleepUninterruptibly(
+ Math.max(0L, delay.getMillis()), TimeUnit.MILLISECONDS);
+ return delay.getMillis();
+ case MIXED:
+ // Mixed mode: for each millisecond of delay randomly choose to spin or sleep.
+ // This is enforced at millisecond granularity since that is the minimum duration that
+ // Thread.sleep() can sleep. Millisecond is also the unit of processing delay.
+ long sleepMillis = 0;
+ for (long i = 0; i < delay.getMillis(); i++) {
+ if (rnd.nextDouble() < cpuUtilizationInMixedDelay) {
+ delay(new Duration(1), 0.0, SyntheticOptions.DelayType.CPU, rnd);
+ } else {
+ sleepMillis += delay(new Duration(1), 0.0, SyntheticOptions.DelayType.SLEEP, rnd);
+ }
+ }
+ return sleepMillis;
+ default:
+ throw new IllegalArgumentException("Unknown delay type " + delayType);
+ }
+ }
+}
diff --git a/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/package-info.java b/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/package-info.java
new file mode 100644
index 0000000..d2f9d71
--- /dev/null
+++ b/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/** Transforms for performing Synthetic Operations in Apache Beam pipelines. */
+package org.apache.beam.sdk.io.synthetic;
diff --git a/sdks/java/io/synthetic/src/test/java/org/apache/beam/sdk/io/synthetic/SyntheticBoundedIOTest.java b/sdks/java/io/synthetic/src/test/java/org/apache/beam/sdk/io/synthetic/SyntheticBoundedIOTest.java
new file mode 100644
index 0000000..ca648fa
--- /dev/null
+++ b/sdks/java/io/synthetic/src/test/java/org/apache/beam/sdk/io/synthetic/SyntheticBoundedIOTest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.synthetic;
+
+import static org.apache.beam.sdk.io.synthetic.SyntheticOptions.fromIntegerDistribution;
+import static org.apache.beam.sdk.io.synthetic.SyntheticOptions.fromRealDistribution;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.List;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.synthetic.SyntheticBoundedIO.SyntheticBoundedSource;
+import org.apache.beam.sdk.io.synthetic.SyntheticBoundedIO.SyntheticSourceOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.SourceTestUtils;
+import org.apache.beam.sdk.values.KV;
+import org.apache.commons.math3.distribution.ConstantRealDistribution;
+import org.apache.commons.math3.distribution.ZipfDistribution;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link SyntheticBoundedIO}. */
+@RunWith(JUnit4.class)
+public class SyntheticBoundedIOTest {
+ @Rule public final ExpectedException thrown = ExpectedException.none();
+
+ private SyntheticSourceOptions testSourceOptions = new SyntheticSourceOptions();
+
+ @Before
+ public void setUp() {
+ testSourceOptions.splitPointFrequencyRecords = 1;
+ testSourceOptions.numRecords = 10;
+ testSourceOptions.keySizeBytes = 10;
+ testSourceOptions.valueSizeBytes = 20;
+ testSourceOptions.numHotKeys = 3;
+ testSourceOptions.hotKeyFraction = 0.3;
+ testSourceOptions.setSeed(123456);
+ testSourceOptions.bundleSizeDistribution =
+ fromIntegerDistribution(new ZipfDistribution(100, 2.5));
+ testSourceOptions.forceNumInitialBundles = null;
+ }
+
+ private SyntheticSourceOptions fromString(String jsonString) throws IOException {
+ ObjectMapper mapper = new ObjectMapper();
+ SyntheticSourceOptions result = mapper.readValue(jsonString, SyntheticSourceOptions.class);
+ result.validate();
+ return result;
+ }
+
+ @Test
+ public void testInvalidSourceOptionsJsonFormat() throws Exception {
+ thrown.expect(JsonParseException.class);
+ String syntheticSourceOptions = "input:unknown URI";
+ fromString(syntheticSourceOptions);
+ }
+
+ @Test
+ public void testFromString() throws Exception {
+ String syntheticSourceOptions =
+ "{\"numRecords\":100,\"splitPointFrequencyRecords\":10,\"keySizeBytes\":10,"
+ + "\"valueSizeBytes\":20,\"numHotKeys\":3,"
+ + "\"hotKeyFraction\":0.3,\"seed\":123456,"
+ + "\"bundleSizeDistribution\":{\"type\":\"const\",\"const\":42},"
+ + "\"forceNumInitialBundles\":10,\"progressShape\":\"LINEAR_REGRESSING\""
+ + "}";
+ SyntheticSourceOptions sourceOptions = fromString(syntheticSourceOptions);
+ assertEquals(100, sourceOptions.numRecords);
+ assertEquals(10, sourceOptions.splitPointFrequencyRecords);
+ assertEquals(10, sourceOptions.keySizeBytes);
+ assertEquals(20, sourceOptions.valueSizeBytes);
+ assertEquals(3, sourceOptions.numHotKeys);
+ assertEquals(0.3, sourceOptions.hotKeyFraction, 0);
+ assertEquals(0, sourceOptions.nextDelay(sourceOptions.seed));
+ assertEquals(123456, sourceOptions.seed);
+ assertEquals(42, sourceOptions.bundleSizeDistribution.sample(123), 0.0);
+ assertEquals(10, sourceOptions.forceNumInitialBundles.intValue());
+ assertEquals(SyntheticBoundedIO.ProgressShape.LINEAR_REGRESSING, sourceOptions.progressShape);
+ }
+
+ @Test
+ public void testSourceOptionsWithNegativeNumRecords() throws Exception {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("numRecords should be a non-negative number, but found -100");
+ testSourceOptions.numRecords = -100;
+ testSourceOptions.validate();
+ }
+
+ /** Test the reader and the source produces the same records. */
+ @Test
+ public void testSourceAndReadersWork() throws Exception {
+ testSourceAndReadersWorkP(1);
+ testSourceAndReadersWorkP(-1);
+ testSourceAndReadersWorkP(3);
+ }
+
+ private void testSourceAndReadersWorkP(long splitPointFrequency) throws Exception {
+ PipelineOptions options = PipelineOptionsFactory.create();
+ testSourceOptions.splitPointFrequencyRecords = splitPointFrequency;
+ SyntheticBoundedSource source = new SyntheticBoundedSource(testSourceOptions);
+ assertEquals(10 * (10 + 20), source.getEstimatedSizeBytes(options));
+ SourceTestUtils.assertUnstartedReaderReadsSameAsItsSource(
+ source.createReader(options), options);
+ }
+
+ @Test
+ public void testSplitAtFraction() throws Exception {
+ testSplitAtFractionP(1);
+ testSplitAtFractionP(3);
+ // Do not test "-1" because then splits would be vacuous
+ }
+
+ private void testSplitAtFractionP(long splitPointFrequency) throws Exception {
+ PipelineOptions options = PipelineOptionsFactory.create();
+ testSourceOptions.splitPointFrequencyRecords = splitPointFrequency;
+ SyntheticBoundedSource source = new SyntheticBoundedSource(testSourceOptions);
+ SourceTestUtils.assertSplitAtFractionExhaustive(source, options);
+ // Can't split if already consumed.
+ SourceTestUtils.assertSplitAtFractionFails(source, 5, 0.3, options);
+ SourceTestUtils.assertSplitAtFractionSucceedsAndConsistent(source, 1, 0.3, options);
+ }
+
+ @Test
+ public void testSplitIntoBundles() throws Exception {
+ testSplitIntoBundlesP(1);
+ testSplitIntoBundlesP(-1);
+ testSplitIntoBundlesP(5);
+
+ PipelineOptions options = PipelineOptionsFactory.create();
+ testSourceOptions.forceNumInitialBundles = 37;
+ assertEquals(
+ 37,
+ new SyntheticBoundedIO.SyntheticBoundedSource(testSourceOptions).split(42, options).size());
+ }
+
+ private void testSplitIntoBundlesP(long splitPointFrequency) throws Exception {
+ PipelineOptions options = PipelineOptionsFactory.create();
+ testSourceOptions.splitPointFrequencyRecords = splitPointFrequency;
+ testSourceOptions.numRecords = 100;
+ SyntheticBoundedSource source = new SyntheticBoundedSource(testSourceOptions);
+ SourceTestUtils.assertSourcesEqualReferenceSource(source, source.split(10, options), options);
+ SourceTestUtils.assertSourcesEqualReferenceSource(source, source.split(40, options), options);
+ SourceTestUtils.assertSourcesEqualReferenceSource(source, source.split(100, options), options);
+ }
+
+ @Test
+ public void testIncreasingProgress() throws Exception {
+ PipelineOptions options = PipelineOptionsFactory.create();
+ testSourceOptions.progressShape = SyntheticBoundedIO.ProgressShape.LINEAR;
+ SyntheticBoundedSource source = new SyntheticBoundedSource(testSourceOptions);
+ BoundedSource.BoundedReader<KV<byte[], byte[]>> reader = source.createReader(options);
+ // Reader starts at 0.0 progress.
+ assertEquals(0, reader.getFractionConsumed(), 1e-5);
+ // Set the lastFractionConsumed < 0.0 so that we can use strict inequality in the below loop.
+ double lastFractionConsumed = -1.0;
+ for (boolean more = reader.start(); more; more = reader.advance()) {
+ assertTrue(reader.getFractionConsumed() > lastFractionConsumed);
+ lastFractionConsumed = reader.getFractionConsumed();
+ }
+ assertEquals(1, reader.getFractionConsumed(), 1e-5);
+ }
+
+ @Test
+ public void testRegressingProgress() throws Exception {
+ PipelineOptions options = PipelineOptionsFactory.create();
+ testSourceOptions.progressShape = SyntheticBoundedIO.ProgressShape.LINEAR_REGRESSING;
+ SyntheticBoundedSource source = new SyntheticBoundedSource(testSourceOptions);
+ BoundedSource.BoundedReader<KV<byte[], byte[]>> reader = source.createReader(options);
+ double lastFractionConsumed = reader.getFractionConsumed();
+ for (boolean more = reader.start(); more; more = reader.advance()) {
+ assertTrue(reader.getFractionConsumed() <= lastFractionConsumed);
+ lastFractionConsumed = reader.getFractionConsumed();
+ }
+ }
+
+ @Test
+ public void testSplitIntoSingleRecordBundles() throws Exception {
+ PipelineOptions options = PipelineOptionsFactory.create();
+ SyntheticSourceOptions sourceOptions = new SyntheticSourceOptions();
+ sourceOptions.numRecords = 10;
+ sourceOptions.setSeed(123456);
+ sourceOptions.bundleSizeDistribution = fromRealDistribution(new ConstantRealDistribution(1.0));
+ sourceOptions.forceNumInitialBundles = 10;
+ SyntheticBoundedSource source = new SyntheticBoundedSource(sourceOptions);
+ List<SyntheticBoundedSource> sources = source.split(42L, options);
+ for (SyntheticBoundedSource recordSource : sources) {
+ recordSource.validate();
+ assertEquals(1, recordSource.getEndOffset() - recordSource.getStartOffset());
+ }
+ SourceTestUtils.assertSourcesEqualReferenceSource(source, sources, options);
+ }
+}
diff --git a/sdks/python/apache_beam/testing/synthetic_pipeline.py b/sdks/python/apache_beam/testing/synthetic_pipeline.py
new file mode 100644
index 0000000..c76b9cd
--- /dev/null
+++ b/sdks/python/apache_beam/testing/synthetic_pipeline.py
@@ -0,0 +1,502 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""A set of utilities to write pipelines for performance tests.
+
+This module offers a way to create pipelines using synthetic sources and steps.
+Exact shape of the pipeline and the behaviour of sources and steps can be
+controlled through arguments. Please see function 'parse_args()' for more
+details about the arguments.
+
+Shape of the pipeline is primariy controlled through two arguments. Argument
+'steps' can be used to define a list of steps as a JSON string. Argument
+'barrier' describes how these steps are separated from each other. Argument
+'barrier' can be use to build a pipeline as a a series of steps or a tree of
+steps with a fanin or a fanout of size 2.
+
+Other arguments describe what gets generated by synthetic sources that produce
+data for the pipeline.
+"""
+
+from __future__ import absolute_import
+from __future__ import division
+
+import argparse
+import json
+import logging
+import math
+import time
+
+import apache_beam as beam
+from apache_beam.io import WriteToText
+from apache_beam.io import iobase
+from apache_beam.io import range_trackers
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.testing.test_pipeline import TestPipeline
+
+try:
+ import numpy as np
+except ImportError:
+ np = None
+
+
+def parse_byte_size(s):
+ suffixes = 'BKMGTP'
+ if s[-1] in suffixes:
+ return int(float(s[:-1]) * 1024**suffixes.index(s[-1]))
+
+ return int(s)
+
+
+def div_round_up(a, b):
+ """Return ceil(a/b)."""
+ return int(math.ceil(float(a) / b))
+
+
+def rotate_key(element):
+ """Returns a new key-value pair of the same size but with a different key."""
+ (key, value) = element
+ return key[-1] + key[:-1], value
+
+
+class SyntheticStep(beam.DoFn):
+ """A DoFn of which behavior can be controlled through prespecified parameters.
+ """
+
+ def __init__(self, per_element_delay_sec=0, per_bundle_delay_sec=0,
+ output_records_per_input_record=1, output_filter_ratio=0):
+ if per_element_delay_sec and per_element_delay_sec < 1e-3:
+ raise ValueError('Per element sleep time must be at least 1e-3. '
+ 'Received: %r', per_element_delay_sec)
+ self._per_element_delay_sec = per_element_delay_sec
+ self._per_bundle_delay_sec = per_bundle_delay_sec
+ self._output_records_per_input_record = output_records_per_input_record
+ self._output_filter_ratio = output_filter_ratio
+
+ def start_bundle(self):
+ self._start_time = time.time()
+
+ def finish_bundle(self):
+ # The target is for the enclosing stage to take as close to as possible
+ # the given number of seconds, so we only sleep enough to make up for
+ # overheads not incurred elsewhere.
+ to_sleep = self._per_bundle_delay_sec - (time.time() - self._start_time)
+
+ # Ignoring sub-millisecond sleep times.
+ if to_sleep >= 1e-3:
+ time.sleep(to_sleep)
+
+ def process(self, element):
+ if self._per_element_delay_sec >= 1e-3:
+ time.sleep(self._per_element_delay_sec)
+ filter_element = False
+ if self._output_filter_ratio > 0:
+ if np.random.random() < self._output_filter_ratio:
+ filter_element = True
+
+ if not filter_element:
+ for _ in range(self._output_records_per_input_record):
+ yield element
+
+
+class SyntheticSource(iobase.BoundedSource):
+ """A custom source of a specified size.
+ """
+
+ def __init__(self, input_spec):
+ """Initiates a synthetic source.
+
+ Args:
+ input_spec: Input specification of the source. See corresponding option in
+ function 'parse_args()' below for more details.
+ Raises:
+ ValueError: if input parameters are invalid.
+ """
+
+ def maybe_parse_byte_size(s):
+ return parse_byte_size(s) if isinstance(s, str) else int(s)
+
+ self._num_records = input_spec['numRecords']
+ self._key_size = maybe_parse_byte_size(input_spec.get('keySizeBytes', 1))
+ self._value_size = maybe_parse_byte_size(
+ input_spec.get('valueSizeBytes', 1))
+ self._total_size = self.element_size * self._num_records
+ self._initial_splitting = (
+ input_spec['bundleSizeDistribution']['type']
+ if 'bundleSizeDistribution' in input_spec else 'const')
+ if self._initial_splitting != 'const' and self._initial_splitting != 'zipf':
+ raise ValueError(
+ 'Only const and zipf distributions are supported for determining '
+ 'sizes of bundles produced by initial splitting. Received: %s',
+ self._initial_splitting)
+ self._initial_splitting_num_bundles = (
+ input_spec['forceNumInitialBundles']
+ if 'forceNumInitialBundles' in input_spec else 0)
+ if self._initial_splitting == 'zipf':
+ self._initial_splitting_distribution_parameter = (
+ input_spec['bundleSizeDistribution']['param'])
+ if self._initial_splitting_distribution_parameter < 1:
+ raise ValueError(
+ 'Parameter for a Zipf distribution must be larger than 1. '
+ 'Received %r.', self._initial_splitting_distribution_parameter)
+ else:
+ self._initial_splitting_distribution_parameter = 0
+ self._dynamic_splitting = (
+ 'none' if (
+ 'splitPointFrequencyRecords' in input_spec
+ and input_spec['splitPointFrequencyRecords'] == 0)
+ else 'perfect')
+ if 'delayDistribution' in input_spec:
+ if input_spec['delayDistribution']['type'] != 'const':
+ raise ValueError('SyntheticSource currently only supports delay '
+ 'distributions of type \'const\'. Received %s.',
+ input_spec['delayDistribution']['type'])
+ self._sleep_per_input_record_sec = (
+ float(input_spec['delayDistribution']['const']) / 1000)
+ if (self._sleep_per_input_record_sec and
+ self._sleep_per_input_record_sec < 1e-3):
+ raise ValueError('Sleep time per input record must be at least 1e-3.'
+ ' Received: %r', self._sleep_per_input_record_sec)
+ else:
+ self._sleep_per_input_record_sec = 0
+
+ @property
+ def element_size(self):
+ return self._key_size + self._value_size
+
+ def estimate_size(self):
+ return self._total_size
+
+ def split(self, desired_bundle_size, start_position=0, stop_position=None):
+ # Performs initial splitting of SyntheticSource.
+ #
+ # Exact sizes and distribution of initial splits generated here depends on
+ # the input specification of the SyntheticSource.
+
+ if stop_position is None:
+ stop_position = self._num_records
+ if self._initial_splitting == 'zipf':
+ desired_num_bundles = self._initial_splitting_num_bundles or math.ceil(
+ float(self.estimate_size()) / desired_bundle_size)
+ samples = np.random.zipf(self._initial_splitting_distribution_parameter,
+ desired_num_bundles)
+ total = sum(samples)
+ relative_bundle_sizes = [(float(sample) / total) for sample in samples]
+ bundle_ranges = []
+ start = start_position
+ index = 0
+ while start < stop_position:
+ if index == desired_num_bundles - 1:
+ bundle_ranges.append((start, stop_position))
+ break
+ stop = start + int(self._num_records * relative_bundle_sizes[index])
+ bundle_ranges.append((start, stop))
+ start = stop
+ index += 1
+ else:
+ if self._initial_splitting_num_bundles:
+ bundle_size_in_elements = max(1, self._num_records /
+ self._initial_splitting_num_bundles)
+ else:
+ bundle_size_in_elements = (max(
+ div_round_up(desired_bundle_size, self.element_size),
+ math.floor(math.sqrt(self._num_records))))
+ bundle_ranges = []
+ for start in range(start_position, stop_position,
+ bundle_size_in_elements):
+ stop = min(start + bundle_size_in_elements, stop_position)
+ bundle_ranges.append((start, stop))
+
+ for start, stop in bundle_ranges:
+ yield iobase.SourceBundle(stop - start, self, start, stop)
+
+ def get_range_tracker(self, start_position, stop_position):
+ if start_position is None:
+ start_position = 0
+ if stop_position is None:
+ stop_position = self._num_records
+ tracker = range_trackers.OffsetRangeTracker(start_position, stop_position)
+ if self._dynamic_splitting == 'none':
+ tracker = range_trackers.UnsplittableRangeTracker(tracker)
+ return tracker
+
+ def read(self, range_tracker):
+ index = range_tracker.start_position()
+ while range_tracker.try_claim(index):
+ r = np.random.RandomState(index)
+
+ time.sleep(self._sleep_per_input_record_sec)
+ yield r.bytes(self._key_size), r.bytes(self._value_size)
+ index += 1
+
+ def default_output_coder(self):
+ return beam.coders.TupleCoder(
+ [beam.coders.BytesCoder(), beam.coders.BytesCoder()])
+
+
+class ShuffleBarrier(beam.PTransform):
+
+ def expand(self, pc):
+ return (pc
+ | beam.Map(rotate_key)
+ | beam.GroupByKey()
+ | 'Ungroup' >> beam.FlatMap(
+ lambda elm: [(elm[0], v) for v in elm[1]]))
+
+
+class SideInputBarrier(beam.PTransform):
+
+ def expand(self, pc):
+ return (pc
+ | beam.Map(rotate_key)
+ | beam.Map(
+ lambda elem, ignored: elem,
+ beam.pvalue.AsIter(pc | beam.FlatMap(lambda elem: None))))
+
+
+def merge_using_gbk(name, pc1, pc2):
+ """Merges two given PCollections using a CoGroupByKey."""
+
+ pc1_with_key = pc1 | (name + 'AttachKey1') >> beam.Map(lambda x: (x, x))
+ pc2_with_key = pc2 | (name + 'AttachKey2') >> beam.Map(lambda x: (x, x))
+
+ grouped = (
+ {'pc1': pc1_with_key, 'pc2': pc2_with_key} |
+ (name + 'Group') >> beam.CoGroupByKey())
+ return (grouped |
+ (name + 'DeDup') >> beam.Map(lambda elm: elm[0])) # Ignoring values
+
+
+def merge_using_side_input(name, pc1, pc2):
+ """Merges two given PCollections using side inputs."""
+
+ def join_fn(val, _): # Ignoring side input
+ return val
+
+ return pc1 | name >> beam.core.Map(join_fn, beam.pvalue.AsIter(pc2))
+
+
+def expand_using_gbk(name, pc):
+ """Expands a given PCollection into two copies using GroupByKey."""
+
+ ret = []
+ ret.append((pc | ('%s.a' % name) >> ShuffleBarrier()))
+ ret.append((pc | ('%s.b' % name) >> ShuffleBarrier()))
+ return ret
+
+
+def expand_using_second_output(name, pc):
+ """Expands a given PCollection into two copies using side outputs."""
+
+ class ExpandFn(beam.DoFn):
+
+ def process(self, element):
+ yield beam.pvalue.TaggedOutput('second_out', element)
+ yield element
+
+ pc1, pc2 = (pc | name >> beam.ParDo(
+ ExpandFn()).with_outputs('second_out', main='main_out'))
+ return [pc1, pc2]
+
+
+def _parse_steps(json_str):
+ """Converts the JSON step description into Python objects.
+
+ See property 'steps' for more details about the JSON step description.
+
+ Args:
+ json_str: a JSON string that describes the steps.
+
+ Returns:
+ Information about steps as a list of dictionaries. Each dictionary may have
+ following properties.
+ (1) per_element_delay - amount of delay for each element in seconds.
+ (2) per_bundle_delay - minimum amount of delay for a given step in seconds.
+ (3) output_records_per_input_record - number of output elements generated
+ for each input element to a step.
+ (4) output_filter_ratio - the probability at which a step may filter out a
+ given element by not producing any output for that element.
+ """
+ all_steps = []
+ json_data = json.loads(json_str)
+ for val in json_data:
+ steps = {}
+ steps['per_element_delay'] = (
+ (float(val['per_element_delay_msec']) / 1000)
+ if 'per_element_delay_msec' in val else 0)
+ steps['per_bundle_delay'] = (
+ float(val['per_bundle_delay_sec'])
+ if 'per_bundle_delay_sec' in val else 0)
+ steps['output_records_per_input_record'] = (
+ int(val['output_records_per_input_record'])
+ if 'output_records_per_input_record' in val else 1)
+ steps['output_filter_ratio'] = (
+ float(val['output_filter_ratio'])
+ if 'output_filter_ratio' in val else 0)
+ all_steps.append(steps)
+
+ return all_steps
+
+
+def parse_args(args):
+ """Parses a given set of arguments.
+
+ Args:
+ args: set of arguments to be passed.
+
+ Returns:
+ a tuple where first item gives the set of arguments defined and parsed
+ within this method and second item gives the set of unknown arguments.
+ """
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ '--steps',
+ dest='steps',
+ type=_parse_steps,
+ help='A JSON string that gives a list where each entry of the list is '
+ 'configuration information for a step. Configuration for each step '
+ 'consists of '
+ '(1) A float "per_bundle_delay_sec" (in seconds). Defaults to 0.'
+ '(2) A float "per_element_delay_msec" (in milli seconds). '
+ ' Defaults to 0.'
+ '(3) An integer "output_records_per_input_record". Defaults to 1.'
+ '(4) A float "output_filter_ratio" in the range [0, 1] . '
+ ' Defaults to 0.')
+
+ parser.add_argument(
+ '--input',
+ dest='input',
+ type=json.loads,
+ help='A JSON string that describes the properties of the SyntheticSource '
+ 'used by the pipeline. Configuration is similar to Java '
+ 'SyntheticBoundedInput.'
+ 'Currently supports following properties. '
+ '(1) An integer "numRecords". '
+ '(2) An integer "keySize". '
+ '(3) An integer "valueSize". '
+ '(4) A tuple "bundleSizeDistribution" with following values. '
+ ' A string "type". Allowed values are "const" and "zipf". '
+ ' An float "param". Only used if "type"=="zipf". Must be '
+ ' larger than 1. '
+ '(5) An integer "forceNumInitialBundles". '
+ '(6) An integer "splitPointFrequencyRecords". '
+ '(7) A tuple "delayDistribution" with following values. '
+ ' A string "type". Only allowed value is "const". '
+ ' An integer "const". ')
+
+ parser.add_argument('--barrier',
+ dest='barrier',
+ default='shuffle',
+ choices=['shuffle', 'side-input', 'expand-gbk',
+ 'expand-second-output', 'merge-gbk',
+ 'merge-side-input'],
+ help='Whether to use shuffle as the barrier '
+ '(as opposed to side inputs).')
+ parser.add_argument('--output',
+ dest='output',
+ default='',
+ help='Destination to write output.')
+
+ return parser.parse_known_args(args)
+
+
+def run(argv=None):
+ """Runs the workflow."""
+ known_args, pipeline_args = parse_args(argv)
+
+ pipeline_options = PipelineOptions(pipeline_args)
+ pipeline_options.view_as(SetupOptions).save_main_session = True
+
+ input_info = known_args.input
+
+ with TestPipeline(options=pipeline_options) as p:
+ source = SyntheticSource(input_info)
+
+ # pylint: disable=expression-not-assigned
+ barrier = known_args.barrier
+
+ pc_list = []
+ num_roots = 2 ** (len(known_args.steps) - 1) if (
+ barrier == 'merge-gbk' or barrier == 'merge-side-input') else 1
+ for read_no in range(num_roots):
+ pc_list.append((p | ('Read %d' % read_no) >> beam.io.Read(source)))
+
+ for step_no, steps in enumerate(known_args.steps):
+ if step_no != 0:
+ new_pc_list = []
+ for pc_no, pc in enumerate(pc_list):
+ if barrier == 'shuffle':
+ new_pc_list.append(
+ (pc |
+ ('shuffle %d.%d' % (step_no, pc_no)) >> ShuffleBarrier()))
+ elif barrier == 'side-input':
+ new_pc_list.append(
+ (pc |
+ ('side-input %d.%d' % (step_no, pc_no)) >> SideInputBarrier()))
+ elif barrier == 'expand-gbk':
+ new_pc_list.extend(
+ expand_using_gbk(('expand-gbk %d.%d' % (step_no, pc_no)), pc))
+ elif barrier == 'expand-second-output':
+ new_pc_list.extend(
+ expand_using_second_output(
+ ('expand-second-output %d.%d' % (step_no, pc_no)), pc))
+ elif barrier == 'merge-gbk':
+ if pc_no % 2 == 0:
+ new_pc_list.append(
+ merge_using_gbk(('merge-gbk %d.%d' % (step_no, pc_no)),
+ pc, pc_list[pc_no + 1]))
+ else:
+ continue
+ elif barrier == 'merge-side-input':
+ if pc_no % 2 == 0:
+ new_pc_list.append(
+ merge_using_side_input(
+ ('merge-side-input %d.%d' % (step_no, pc_no)),
+ pc, pc_list[pc_no + 1]))
+ else:
+ continue
+
+ pc_list = new_pc_list
+
+ new_pc_list = []
+ for pc_no, pc in enumerate(pc_list):
+ new_pc = pc | 'SyntheticStep %d.%d' % (step_no, pc_no) >> beam.ParDo(
+ SyntheticStep(
+ per_element_delay_sec=steps['per_element_delay'],
+ per_bundle_delay_sec=steps['per_bundle_delay'],
+ output_records_per_input_record=
+ steps['output_records_per_input_record'],
+ output_filter_ratio=
+ steps['output_filter_ratio']))
+ new_pc_list.append(new_pc)
+ pc_list = new_pc_list
+
+ if known_args.output:
+ # If an output location is provided we format and write output.
+ if len(pc_list) == 1:
+ (pc_list[0] |
+ 'FormatOutput' >> beam.Map(lambda elm: (elm[0] + elm[1])) |
+ 'WriteOutput' >> WriteToText(known_args.output))
+
+ logging.info('Pipeline run completed.')
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ run()
diff --git a/sdks/python/apache_beam/testing/synthetic_pipeline_test.py b/sdks/python/apache_beam/testing/synthetic_pipeline_test.py
new file mode 100644
index 0000000..fe5e94a
--- /dev/null
+++ b/sdks/python/apache_beam/testing/synthetic_pipeline_test.py
@@ -0,0 +1,154 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Tests for apache_beam.testing.synthetic_pipeline."""
+
+from __future__ import absolute_import
+
+import glob
+import json
+import logging
+import tempfile
+import time
+import unittest
+
+import apache_beam as beam
+from apache_beam.io import source_test_utils
+from apache_beam.testing import synthetic_pipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+try:
+ import numpy as np
+except ImportError:
+ np = None
+
+
+def input_spec(num_records, key_size, value_size,
+ bundle_size_distribution_type='const',
+ bundle_size_distribution_param=0,
+ force_initial_num_bundles=0):
+ return {
+ 'numRecords': num_records,
+ 'keySizeBytes': key_size,
+ 'valueSizeBytes': value_size,
+ 'bundleSizeDistribution': {'type': bundle_size_distribution_type,
+ 'param': bundle_size_distribution_param},
+ 'forceNumInitialBundles': force_initial_num_bundles,
+ }
+
+
+@unittest.skipIf(np is None, 'Synthetic source dependencies are not installed')
+class SyntheticPipelineTest(unittest.TestCase):
+
+ # pylint: disable=expression-not-assigned
+
+ def testSyntheticStep(self):
+ start = time.time()
+ with beam.Pipeline() as p:
+ pcoll = p | beam.Create(list(range(10))) | beam.ParDo(
+ synthetic_pipeline.SyntheticStep(0, 0.5, 10))
+ assert_that(
+ pcoll | beam.combiners.Count.Globally(), equal_to([100]))
+
+ elapsed = time.time() - start
+ # TODO(chamikaramj): Fix the flaky time based bounds.
+ self.assertTrue(0.5 <= elapsed <= 3, elapsed)
+
+ def testSyntheticSource(self):
+ def assert_size(element, expected_size):
+ assert len(element) == expected_size
+ with beam.Pipeline() as p:
+ pcoll = (
+ p | beam.io.Read(
+ synthetic_pipeline.SyntheticSource(input_spec(300, 5, 15))))
+ (pcoll
+ | beam.Map(lambda elm: elm[0]) | 'key' >> beam.Map(assert_size, 5))
+ (pcoll
+ | beam.Map(lambda elm: elm[1]) | 'value' >> beam.Map(assert_size, 15))
+ assert_that(pcoll | beam.combiners.Count.Globally(),
+ equal_to([300]))
+
+ def testSyntheticSourceSplitEven(self):
+ source = synthetic_pipeline.SyntheticSource(
+ input_spec(1000, 1, 1, 'const', 0))
+ splits = source.split(100)
+ sources_info = [(split.source, split.start_position, split.stop_position)
+ for split in splits]
+ self.assertEquals(20, len(sources_info))
+ source_test_utils.assert_sources_equal_reference_source(
+ (source, None, None), sources_info)
+
+ def testSyntheticSourceSplitUneven(self):
+ source = synthetic_pipeline.SyntheticSource(
+ input_spec(1000, 1, 1, 'zipf', 3, 10))
+ splits = source.split(100)
+ sources_info = [(split.source, split.start_position, split.stop_position)
+ for split in splits]
+ self.assertEquals(10, len(sources_info))
+ source_test_utils.assert_sources_equal_reference_source(
+ (source, None, None), sources_info)
+
+ def testSplitAtFraction(self):
+ source = synthetic_pipeline.SyntheticSource(input_spec(10, 1, 1))
+ source_test_utils.assert_split_at_fraction_exhaustive(source)
+ source_test_utils.assert_split_at_fraction_fails(source, 5, 0.3)
+ source_test_utils.assert_split_at_fraction_succeeds_and_consistent(
+ source, 1, 0.3)
+
+ def run_pipeline(self, barrier, writes_output=True):
+ steps = [{'per_element_delay': 1}, {'per_element_delay': 1}]
+ args = ['--barrier=%s' % barrier, '--runner=DirectRunner',
+ '--steps=%s' % json.dumps(steps),
+ '--input=%s' % json.dumps(input_spec(10, 1, 1))]
+ if writes_output:
+ output_location = tempfile.NamedTemporaryFile().name
+ args.append('--output=%s' % output_location)
+
+ synthetic_pipeline.run(args)
+
+ # Verify output
+ if writes_output:
+ read_output = []
+ for file_name in glob.glob(output_location + '*'):
+ with open(file_name, 'r') as f:
+ read_output.extend(f.read().splitlines())
+
+ self.assertEqual(10, len(read_output))
+
+ def testPipelineShuffle(self):
+ self.run_pipeline('shuffle')
+
+ def testPipelineSideInput(self):
+ self.run_pipeline('side-input')
+
+ def testPipelineExpandGBK(self):
+ self.run_pipeline('expand-gbk', False)
+
+ def testPipelineExpandSideOutput(self):
+ self.run_pipeline('expand-second-output', False)
+
+ def testPipelineMergeGBK(self):
+ self.run_pipeline('merge-gbk')
+
+ def testPipelineMergeSideInput(self):
+ self.run_pipeline('merge-side-input')
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ unittest.main()
diff --git a/sdks/python/scripts/generate_pydoc.sh b/sdks/python/scripts/generate_pydoc.sh
index 0e5bfbb..2f6f0f1 100755
--- a/sdks/python/scripts/generate_pydoc.sh
+++ b/sdks/python/scripts/generate_pydoc.sh
@@ -178,6 +178,7 @@ nitpicky = True
nitpick_ignore = []
nitpick_ignore += [('py:class', iden) for iden in ignore_identifiers]
nitpick_ignore += [('py:obj', iden) for iden in ignore_identifiers]
+nitpick_ignore += [('py:exc', 'ValueError')]
EOF
#=== index.rst ===#
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index 46c8050..43df52b 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -128,6 +128,10 @@ REQUIRED_TEST_PACKAGES = [
'pyhamcrest>=1.9,<2.0',
]
+REQUIRED_PERF_TEST_PACKAGES = [
+ 'numpy>=1.14.3',
+]
+
GCP_REQUIREMENTS = [
# oauth2client >=4 only works with google-apitools>=0.5.18.
'google-apitools>=0.5.18,<=0.5.20',
@@ -192,7 +196,8 @@ setuptools.setup(
extras_require={
'docs': ['Sphinx>=1.5.2,<2.0'],
'test': REQUIRED_TEST_PACKAGES,
- 'gcp': GCP_REQUIREMENTS
+ 'gcp': GCP_REQUIREMENTS,
+ 'perftest': REQUIRED_PERF_TEST_PACKAGES,
},
zip_safe=False,
# PyPI package information.
diff --git a/settings.gradle b/settings.gradle
index 90bf3f9..a8ed673 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -152,6 +152,8 @@ include "beam-sdks-java-io-tika"
project(":beam-sdks-java-io-tika").dir = file("sdks/java/io/tika")
include "beam-sdks-java-io-xml"
project(":beam-sdks-java-io-xml").dir = file("sdks/java/io/xml")
+include "beam-sdks-java-io-synthetic"
+project(":beam-sdks-java-io-synthetic").dir = file("sdks/java/io/synthetic")
include "beam-sdks-java-javadoc"
project(":beam-sdks-java-javadoc").dir = file("sdks/java/javadoc")
include "beam-sdks-java-maven-archetypes-examples"