You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/31 01:37:31 UTC

[2/2] incubator-beam git commit: Add UnboundedCountingInput#withRate

Add UnboundedCountingInput#withRate

The rate controls the speed at which UnboundedCountingInput outputs
elements. This is an aggregate rate across all instances of the
source, and thus elements will not necessarily be output "smoothly",
or within the first period. The aggregate rate, however, will be
approximately equal to the provided rate.

Add package-private CountingSource#createUnbounded() to expose the
UnboundedCountingSource type. Make UnboundedCountingSource
package-private.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b21a0be2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b21a0be2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b21a0be2

Branch: refs/heads/master
Commit: b21a0be25cab6c3a82fafbe18ee3d487f5741fa1
Parents: ffb7002
Author: Thomas Groh <tg...@google.com>
Authored: Mon Feb 29 13:48:09 2016 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Mar 30 16:37:17 2016 -0700

----------------------------------------------------------------------
 .../cloud/dataflow/sdk/io/CountingInput.java    | 42 ++++++++-
 .../cloud/dataflow/sdk/io/CountingSource.java   | 99 ++++++++++++++++++--
 .../dataflow/sdk/io/CountingInputTest.java      | 25 +++++
 .../dataflow/sdk/io/CountingSourceTest.java     | 73 +++++++++++++++
 4 files changed, 226 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b21a0be2/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CountingInput.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CountingInput.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CountingInput.java
index 810c252..892682d 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CountingInput.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CountingInput.java
@@ -90,7 +90,11 @@ public class CountingInput {
    */
   public static UnboundedCountingInput unbounded() {
     return new UnboundedCountingInput(
-        new NowTimestampFn(), Optional.<Long>absent(), Optional.<Duration>absent());
+        new NowTimestampFn(),
+        1L /* Elements per period */,
+        Duration.ZERO /* Period length */,
+        Optional.<Long>absent() /* Maximum number of records */,
+        Optional.<Duration>absent() /* Maximum read duration */);
   }
 
   /**
@@ -126,14 +130,20 @@ public class CountingInput {
    */
   public static class UnboundedCountingInput extends PTransform<PBegin, PCollection<Long>> {
     private final SerializableFunction<Long, Instant> timestampFn;
+    private final long elementsPerPeriod;
+    private final Duration period;
     private final Optional<Long> maxNumRecords;
     private final Optional<Duration> maxReadTime;
 
     private UnboundedCountingInput(
         SerializableFunction<Long, Instant> timestampFn,
+        long elementsPerPeriod,
+        Duration period,
         Optional<Long> maxNumRecords,
         Optional<Duration> maxReadTime) {
       this.timestampFn = timestampFn;
+      this.elementsPerPeriod = elementsPerPeriod;
+      this.period = period;
       this.maxNumRecords = maxNumRecords;
       this.maxReadTime = maxReadTime;
     }
@@ -145,7 +155,8 @@ public class CountingInput {
      * <p>Note that the timestamps produced by {@code timestampFn} may not decrease.
      */
     public UnboundedCountingInput withTimestampFn(SerializableFunction<Long, Instant> timestampFn) {
-      return new UnboundedCountingInput(timestampFn, maxNumRecords, maxReadTime);
+      return new UnboundedCountingInput(
+          timestampFn, elementsPerPeriod, period, maxNumRecords, maxReadTime);
     }
 
     /**
@@ -158,7 +169,23 @@ public class CountingInput {
     public UnboundedCountingInput withMaxNumRecords(long maxRecords) {
       checkArgument(
           maxRecords > 0, "MaxRecords must be a positive (nonzero) value. Got %s", maxRecords);
-      return new UnboundedCountingInput(timestampFn, Optional.of(maxRecords), maxReadTime);
+      return new UnboundedCountingInput(
+          timestampFn, elementsPerPeriod, period, Optional.of(maxRecords), maxReadTime);
+    }
+
+    /**
+     * Returns an {@link UnboundedCountingInput} like this one, but with output production limited
+     * to an aggregate rate of no more than the number of elements per the period length.
+     *
+     * <p>Note that when there are multiple splits, each split outputs independently. This may lead
+     * to elements not being produced evenly across time, though the aggregate rate will still
+     * approach the specified rate.
+     *
+     * <p>A duration of {@link Duration#ZERO} will produce output as fast as possible.
+     */
+    public UnboundedCountingInput withRate(long numElements, Duration periodLength) {
+      return new UnboundedCountingInput(
+          timestampFn, numElements, periodLength, maxNumRecords, maxReadTime);
     }
 
     /**
@@ -170,13 +197,18 @@ public class CountingInput {
      */
     public UnboundedCountingInput withMaxReadTime(Duration readTime) {
       checkNotNull(readTime, "ReadTime cannot be null");
-      return new UnboundedCountingInput(timestampFn, maxNumRecords, Optional.of(readTime));
+      return new UnboundedCountingInput(
+          timestampFn, elementsPerPeriod, period, maxNumRecords, Optional.of(readTime));
     }
 
     @SuppressWarnings("deprecation")
     @Override
     public PCollection<Long> apply(PBegin begin) {
-      Unbounded<Long> read = Read.from(CountingSource.unboundedWithTimestampFn(timestampFn));
+      Unbounded<Long> read =
+          Read.from(
+              CountingSource.createUnbounded()
+                  .withTimestampFn(timestampFn)
+                  .withRate(elementsPerPeriod, period));
       if (!maxNumRecords.isPresent() && !maxReadTime.isPresent()) {
         return begin.apply(read);
       } else if (maxNumRecords.isPresent() && !maxReadTime.isPresent()) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b21a0be2/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CountingSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CountingSource.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CountingSource.java
index 0b68edb..78c98cb 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CountingSource.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CountingSource.java
@@ -18,6 +18,7 @@
 package com.google.cloud.dataflow.sdk.io;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.cloud.dataflow.sdk.coders.AvroCoder;
 import com.google.cloud.dataflow.sdk.coders.Coder;
@@ -30,6 +31,7 @@ import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
 import com.google.cloud.dataflow.sdk.values.PCollection;
 import com.google.common.collect.ImmutableList;
 
+import org.joda.time.Duration;
 import org.joda.time.Instant;
 
 import java.io.IOException;
@@ -83,6 +85,14 @@ public class CountingSource {
   }
 
   /**
+   * Create a new {@link UnboundedCountingSource}.
+   */
+  // package-private to return a typed UnboundedCountingSource rather than the UnboundedSource type.
+  static UnboundedCountingSource createUnbounded() {
+    return new UnboundedCountingSource(0, 1, 1L, Duration.ZERO, new NowTimestampFn());
+  }
+
+  /**
    * Creates an {@link UnboundedSource} that will produce numbers starting from {@code 0} up to
    * {@link Long#MAX_VALUE}.
    *
@@ -114,7 +124,7 @@ public class CountingSource {
   @Deprecated
   public static UnboundedSource<Long, CounterMark> unboundedWithTimestampFn(
       SerializableFunction<Long, Instant> timestampFn) {
-    return new UnboundedCountingSource(0, 1, timestampFn);
+    return new UnboundedCountingSource(0, 1, 1L, Duration.ZERO, timestampFn);
   }
 
   /////////////////////////////////////////////////////////////////////////////////////////////
@@ -227,11 +237,15 @@ public class CountingSource {
   /**
    * An implementation of {@link CountingSource} that produces an unbounded {@link PCollection}.
    */
-  private static class UnboundedCountingSource extends UnboundedSource<Long, CounterMark> {
+  static class UnboundedCountingSource extends UnboundedSource<Long, CounterMark> {
     /** The first number (>= 0) generated by this {@link UnboundedCountingSource}. */
     private final long start;
     /** The interval between numbers generated by this {@link UnboundedCountingSource}. */
     private final long stride;
+    /** The number of elements to produce each period. */
+    private final long elementsPerPeriod;
+    /** The time between producing numbers from this {@link UnboundedCountingSource}. */
+    private final Duration period;
     /** The function used to produce timestamps for the generated elements. */
     private final SerializableFunction<Long, Instant> timestampFn;
 
@@ -244,14 +258,46 @@ public class CountingSource {
      *
      * <p>Note that the timestamps produced by {@code timestampFn} may not decrease.
      */
-    public UnboundedCountingSource(
-        long start, long stride, SerializableFunction<Long, Instant> timestampFn) {
+    private UnboundedCountingSource(
+        long start,
+        long stride,
+        long elementsPerPeriod,
+        Duration period,
+        SerializableFunction<Long, Instant> timestampFn) {
       this.start = start;
       this.stride = stride;
+      checkArgument(
+          elementsPerPeriod > 0L,
+          "Must produce at least one element per period, got %s",
+          elementsPerPeriod);
+      this.elementsPerPeriod = elementsPerPeriod;
+      checkArgument(
+          period.getMillis() >= 0L, "Must have a non-negative period length, got %s", period);
+      this.period = period;
       this.timestampFn = timestampFn;
     }
 
     /**
+     * Returns an {@link UnboundedCountingSource} like this one with the specified period. Elements
+     * will be produced with an interval between them equal to the period.
+     */
+    public UnboundedCountingSource withRate(long elementsPerPeriod, Duration period) {
+      return new UnboundedCountingSource(start, stride, elementsPerPeriod, period, timestampFn);
+    }
+
+    /**
+     * Returns an {@link UnboundedCountingSource} like this one where the timestamp of output
+     * elements are supplied by the specified function.
+     *
+     * <p>Note that timestamps produced by {@code timestampFn} may not decrease.
+     */
+    public UnboundedCountingSource withTimestampFn(
+        SerializableFunction<Long, Instant> timestampFn) {
+      checkNotNull(timestampFn);
+      return new UnboundedCountingSource(start, stride, elementsPerPeriod, period, timestampFn);
+    }
+
+    /**
      * Splits an unbounded source {@code desiredNumSplits} ways by giving each split every
      * {@code desiredNumSplits}th element that this {@link UnboundedCountingSource}
      * produces.
@@ -271,7 +317,9 @@ public class CountingSource {
       for (int i = 0; i < desiredNumSplits; ++i) {
         // Starts offset by the original stride. Using Javadoc example, this generates starts of
         // 0, 2, and 4.
-        splits.add(new UnboundedCountingSource(start + i * stride, newStride, timestampFn));
+        splits.add(
+            new UnboundedCountingSource(
+                start + i * stride, newStride, elementsPerPeriod, period, timestampFn));
       }
       return splits.build();
     }
@@ -305,6 +353,7 @@ public class CountingSource {
     private UnboundedCountingSource source;
     private long current;
     private Instant currentTimestamp;
+    private Instant firstStarted;
 
     public UnboundedCountingReader(UnboundedCountingSource source, CounterMark mark) {
       this.source = source;
@@ -314,11 +363,15 @@ public class CountingSource {
         this.current = source.start - source.stride;
       } else {
         this.current = mark.getLastEmitted();
+        this.firstStarted = mark.getStartTime();
       }
     }
 
     @Override
     public boolean start() throws IOException {
+      if (firstStarted == null) {
+        this.firstStarted = Instant.now();
+      }
       return advance();
     }
 
@@ -328,11 +381,25 @@ public class CountingSource {
       if (Long.MAX_VALUE - source.stride < current) {
         return false;
       }
-      current += source.stride;
+      long nextValue = current + source.stride;
+      if (expectedValue() < nextValue) {
+        return false;
+      }
+      current = nextValue;
       currentTimestamp = source.timestampFn.apply(current);
       return true;
     }
 
+    private long expectedValue() {
+      if (source.period.getMillis() == 0L) {
+        return Long.MAX_VALUE;
+      }
+      double periodsElapsed =
+          (Instant.now().getMillis() - firstStarted.getMillis())
+              / (double) source.period.getMillis();
+      return (long) (source.elementsPerPeriod * periodsElapsed);
+    }
+
     @Override
     public Instant getWatermark() {
       return source.timestampFn.apply(current);
@@ -340,7 +407,7 @@ public class CountingSource {
 
     @Override
     public CounterMark getCheckpointMark() {
-      return new CounterMark(current);
+      return new CounterMark(current, firstStarted);
     }
 
     @Override
@@ -360,6 +427,12 @@ public class CountingSource {
 
     @Override
     public void close() throws IOException {}
+
+    @Override
+    public long getSplitBacklogBytes() {
+      long expected = expectedValue();
+      return Math.max(0L, 8 * (expected - current) / source.stride);
+    }
   }
 
   /**
@@ -370,12 +443,14 @@ public class CountingSource {
   public static class CounterMark implements UnboundedSource.CheckpointMark {
     /** The last value emitted. */
     private final long lastEmitted;
+    private final Instant startTime;
 
     /**
      * Creates a checkpoint mark reflecting the last emitted value.
      */
-    public CounterMark(long lastEmitted) {
+    public CounterMark(long lastEmitted, Instant startTime) {
       this.lastEmitted = lastEmitted;
+      this.startTime = startTime;
     }
 
     /**
@@ -385,11 +460,19 @@ public class CountingSource {
       return lastEmitted;
     }
 
+    /**
+     * Returns the time the reader was started.
+     */
+    public Instant getStartTime() {
+      return startTime;
+    }
+
     /////////////////////////////////////////////////////////////////////////////////////
 
     @SuppressWarnings("unused") // For AvroCoder
     private CounterMark() {
       this.lastEmitted = 0L;
+      this.startTime = Instant.now();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b21a0be2/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java
index ddba14e..5a7c2fb 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java
@@ -18,6 +18,9 @@
  */
 package com.google.cloud.dataflow.sdk.io;
 
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
 import com.google.cloud.dataflow.sdk.Pipeline;
 import com.google.cloud.dataflow.sdk.io.CountingInput.UnboundedCountingInput;
 import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
@@ -32,6 +35,7 @@ import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates;
 import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
 import com.google.cloud.dataflow.sdk.values.PCollection;
 
+import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -83,6 +87,27 @@ public class CountingInputTest {
     p.run();
   }
 
+  @Test
+  public void testUnboundedInputRate() {
+    Pipeline p = TestPipeline.create();
+    long numElements = 5000;
+
+    long elemsPerPeriod = 10L;
+    Duration periodLength = Duration.millis(8);
+    PCollection<Long> input =
+        p.apply(
+            CountingInput.unbounded()
+                .withRate(elemsPerPeriod, periodLength)
+                .withMaxNumRecords(numElements));
+
+    addCountingAsserts(input, numElements);
+    long expectedRuntimeMillis = (periodLength.getMillis() * numElements) / elemsPerPeriod;
+    Instant startTime = Instant.now();
+    p.run();
+    Instant endTime = Instant.now();
+    assertThat(endTime.isAfter(startTime.plus(expectedRuntimeMillis)), is(true));
+  }
+
   private static class ElementValueDiff extends DoFn<Long, Long> {
     @Override
     public void processElement(ProcessContext c) throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b21a0be2/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CountingSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CountingSourceTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CountingSourceTest.java
index 796d46c..09c1f2a 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CountingSourceTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CountingSourceTest.java
@@ -17,11 +17,15 @@
  */
 package com.google.cloud.dataflow.sdk.io;
 
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThan;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 import com.google.cloud.dataflow.sdk.Pipeline;
 import com.google.cloud.dataflow.sdk.io.CountingSource.CounterMark;
+import com.google.cloud.dataflow.sdk.io.CountingSource.UnboundedCountingSource;
 import com.google.cloud.dataflow.sdk.io.UnboundedSource.UnboundedReader;
 import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
 import com.google.cloud.dataflow.sdk.testing.RunnableOnService;
@@ -38,6 +42,7 @@ import com.google.cloud.dataflow.sdk.util.CoderUtils;
 import com.google.cloud.dataflow.sdk.values.PCollection;
 import com.google.cloud.dataflow.sdk.values.PCollectionList;
 
+import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -151,6 +156,40 @@ public class CountingSourceTest {
   }
 
   @Test
+  public void testUnboundedSourceWithRate() {
+    Pipeline p = TestPipeline.create();
+
+    Duration period = Duration.millis(5);
+    long numElements = 1000L;
+
+    PCollection<Long> input =
+        p.apply(
+            Read.from(
+                    CountingSource.createUnbounded()
+                        .withTimestampFn(new ValueAsTimestampFn())
+                        .withRate(1, period))
+                .withMaxNumRecords(numElements));
+    addCountingAsserts(input, numElements);
+
+    PCollection<Long> diffs =
+        input
+            .apply("TimestampDiff", ParDo.of(new ElementValueDiff()))
+            .apply("RemoveDuplicateTimestamps", RemoveDuplicates.<Long>create());
+    // This assert also confirms that diffs only has one unique value.
+    DataflowAssert.thatSingleton(diffs).isEqualTo(0L);
+
+    Instant started = Instant.now();
+    p.run();
+    Instant finished = Instant.now();
+    Duration expectedDuration = period.multipliedBy((int) numElements);
+    assertThat(
+        started
+            .plus(expectedDuration)
+            .isBefore(finished),
+        is(true));
+  }
+
+  @Test
   @Category(RunnableOnService.class)
   public void testUnboundedSourceSplits() throws Exception {
     Pipeline p = TestPipeline.create();
@@ -175,6 +214,40 @@ public class CountingSourceTest {
     p.run();
   }
 
+  @Test
+  public void testUnboundedSourceRateSplits() throws Exception {
+    Pipeline p = TestPipeline.create();
+    int elementsPerPeriod = 10;
+    Duration period = Duration.millis(5);
+
+    long numElements = 1000;
+    int numSplits = 10;
+
+    UnboundedCountingSource initial =
+        CountingSource.createUnbounded().withRate(elementsPerPeriod, period);
+    List<? extends UnboundedSource<Long, ?>> splits =
+        initial.generateInitialSplits(numSplits, p.getOptions());
+    assertEquals("Expected exact splitting", numSplits, splits.size());
+
+    long elementsPerSplit = numElements / numSplits;
+    assertEquals("Expected even splits", numElements, elementsPerSplit * numSplits);
+    PCollectionList<Long> pcollections = PCollectionList.empty(p);
+    for (int i = 0; i < splits.size(); ++i) {
+      pcollections =
+          pcollections.and(
+              p.apply("split" + i, Read.from(splits.get(i)).withMaxNumRecords(elementsPerSplit)));
+    }
+    PCollection<Long> input = pcollections.apply(Flatten.<Long>pCollections());
+
+    addCountingAsserts(input, numElements);
+    Instant startTime = Instant.now();
+    p.run();
+    Instant endTime = Instant.now();
+    // 500 ms if the readers are all initialized in parallel; 5000 ms if they are evaluated serially
+    long expectedMinimumMillis = (numElements * period.getMillis()) / elementsPerPeriod;
+    assertThat(expectedMinimumMillis, lessThan(endTime.getMillis() - startTime.getMillis()));
+  }
+
   /**
    * A timestamp function that uses the given value as the timestamp. Because the input values will
    * not wrap, this function is non-decreasing and meets the timestamp function criteria laid out