You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/04/21 23:56:17 UTC

[2/5] beam git commit: Deletes CountingInput

Deletes CountingInput


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/57eeaae1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/57eeaae1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/57eeaae1

Branch: refs/heads/master
Commit: 57eeaae11ea248c8145f467148e799d6c3565402
Parents: 6a9a24c
Author: Eugene Kirpichov <ki...@google.com>
Authored: Wed Apr 19 15:36:42 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Apr 21 16:53:50 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/CountingInput.java   | 283 -------------------
 .../apache/beam/sdk/io/CountingInputTest.java   | 221 ---------------
 2 files changed, 504 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/57eeaae1/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java
deleted file mode 100644
index ab006d4..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java
+++ /dev/null
@@ -1,283 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.base.Optional;
-import org.apache.beam.sdk.io.CountingSource.NowTimestampFn;
-import org.apache.beam.sdk.io.Read.Unbounded;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollection.IsBounded;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-/**
- * A {@link PTransform} that produces longs. When used to produce a
- * {@link IsBounded#BOUNDED bounded} {@link PCollection}, {@link CountingInput} starts at {@code 0}
- * or starting value, and counts up to a specified maximum. When used to produce an
- * {@link IsBounded#UNBOUNDED unbounded} {@link PCollection}, it counts up to {@link Long#MAX_VALUE}
- * and then never produces more output. (In practice, this limit should never be reached.)
- *
- * <p>The bounded {@link CountingInput} is implemented based on {@link OffsetBasedSource} and
- * {@link OffsetBasedSource.OffsetBasedReader}, so it performs efficient initial splitting and it
- * supports dynamic work rebalancing.
- *
- * <p>To produce a bounded {@code PCollection<Long>} starting from {@code 0},
- * use {@link CountingInput#upTo(long)}:
- *
- * <pre>{@code
- * Pipeline p = ...
- * PTransform<PBegin, PCollection<Long>> producer = CountingInput.upTo(1000);
- * PCollection<Long> bounded = p.apply(producer);
- * }</pre>
- *
- * <p>To produce a bounded {@code PCollection<Long>} starting from {@code startOffset},
- * use {@link CountingInput#forSubrange(long, long)} instead.
- *
- * <p>To produce an unbounded {@code PCollection<Long>}, use {@link CountingInput#unbounded()},
- * calling {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)} to provide values
- * with timestamps other than {@link Instant#now}.
- *
- * <pre>{@code
- * Pipeline p = ...
- *
- * // To create an unbounded producer that uses processing time as the element timestamp.
- * PCollection<Long> unbounded = p.apply(CountingInput.unbounded());
- * // Or, to create an unbounded source that uses a provided function to set the element timestamp.
- * PCollection<Long> unboundedWithTimestamps =
- *     p.apply(CountingInput.unbounded().withTimestampFn(someFn));
- * }</pre>
- */
-public class CountingInput {
-  /**
-   * Creates a {@link BoundedCountingInput} that will produce the specified number of elements,
-   * from {@code 0} to {@code numElements - 1}.
-   */
-  public static BoundedCountingInput upTo(long numElements) {
-    checkArgument(numElements >= 0,
-        "numElements (%s) must be greater than or equal to 0",
-        numElements);
-    return new BoundedCountingInput(numElements);
-  }
-
-  /**
-   * Creates a {@link BoundedCountingInput} that will produce elements
-   * starting from {@code startIndex} (inclusive) to {@code endIndex} (exclusive).
-   * If {@code startIndex == endIndex}, then no elements will be produced.
-   */
-  public static BoundedCountingInput forSubrange(long startIndex, long endIndex) {
-    checkArgument(endIndex >= startIndex,
-        "endIndex (%s) must be greater than or equal to startIndex (%s)",
-        endIndex, startIndex);
-    return new BoundedCountingInput(startIndex, endIndex);
-  }
-
-  /**
-   * Creates an {@link UnboundedCountingInput} that will produce numbers starting from {@code 0} up
-   * to {@link Long#MAX_VALUE}.
-   *
-   * <p>After {@link Long#MAX_VALUE}, the transform never produces more output. (In practice, this
-   * limit should never be reached.)
-   *
-   * <p>Elements in the resulting {@link PCollection PCollection&lt;Long&gt;} will by default have
-   * timestamps corresponding to processing time at element generation, provided by
-   * {@link Instant#now}. Use the transform returned by
-   * {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)} to control the output
-   * timestamps.
-   */
-  public static UnboundedCountingInput unbounded() {
-    return new UnboundedCountingInput(
-        new NowTimestampFn(),
-        1L /* Elements per period */,
-        Duration.ZERO /* Period length */,
-        Optional.<Long>absent() /* Maximum number of records */,
-        Optional.<Duration>absent() /* Maximum read duration */);
-  }
-
-  /**
-   * A {@link PTransform} that will produce a specified number of {@link Long Longs} starting from
-   * 0.
-   *
-   * <pre>{@code
-   * PCollection<Long> bounded = p.apply(CountingInput.upTo(10L));
-   * }</pre>
-   */
-  public static class BoundedCountingInput extends PTransform<PBegin, PCollection<Long>> {
-    private final long startIndex;
-    private final long endIndex;
-
-    private BoundedCountingInput(long numElements) {
-      this.endIndex = numElements;
-      this.startIndex = 0;
-    }
-
-    private BoundedCountingInput(long startIndex, long endIndex) {
-      this.endIndex = endIndex;
-      this.startIndex = startIndex;
-    }
-
-    @Override
-    public PCollection<Long> expand(PBegin begin) {
-      return begin.apply(Read.from(CountingSource.createSourceForSubrange(startIndex, endIndex)));
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      super.populateDisplayData(builder);
-
-      if (startIndex == 0) {
-            builder.add(DisplayData.item("upTo", endIndex)
-                .withLabel("Count Up To"));
-      } else {
-            builder.add(DisplayData.item("startAt", startIndex).withLabel("Count Starting At"))
-                    .add(DisplayData.item("upTo", endIndex).withLabel("Count Up To"));
-        }
-    }
-  }
-
-  /**
-   * A {@link PTransform} that will produce numbers starting from {@code 0} up to
-   * {@link Long#MAX_VALUE}.
-   *
-   * <p>After {@link Long#MAX_VALUE}, the transform never produces more output. (In practice, this
-   * limit should never be reached.)
-   *
-   * <p>Elements in the resulting {@link PCollection PCollection&lt;Long&gt;} will by default have
-   * timestamps corresponding to processing time at element generation, provided by
-   * {@link Instant#now}. Use the transform returned by
-   * {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)} to control the output
-   * timestamps.
-   */
-  public static class UnboundedCountingInput extends PTransform<PBegin, PCollection<Long>> {
-    private final SerializableFunction<Long, Instant> timestampFn;
-    private final long elementsPerPeriod;
-    private final Duration period;
-    private final Optional<Long> maxNumRecords;
-    private final Optional<Duration> maxReadTime;
-
-    private UnboundedCountingInput(
-        SerializableFunction<Long, Instant> timestampFn,
-        long elementsPerPeriod,
-        Duration period,
-        Optional<Long> maxNumRecords,
-        Optional<Duration> maxReadTime) {
-      this.timestampFn = timestampFn;
-      this.elementsPerPeriod = elementsPerPeriod;
-      this.period = period;
-      this.maxNumRecords = maxNumRecords;
-      this.maxReadTime = maxReadTime;
-    }
-
-    /**
-     * Returns an {@link UnboundedCountingInput} like this one, but where output elements have the
-     * timestamp specified by the timestampFn.
-     *
-     * <p>Note that the timestamps produced by {@code timestampFn} may not decrease.
-     */
-    public UnboundedCountingInput withTimestampFn(SerializableFunction<Long, Instant> timestampFn) {
-      return new UnboundedCountingInput(
-          timestampFn, elementsPerPeriod, period, maxNumRecords, maxReadTime);
-    }
-
-    /**
-     * Returns an {@link UnboundedCountingInput} like this one, but that will read at most the
-     * specified number of elements.
-     *
-     * <p>A bounded amount of elements will be produced by the result transform, and the result
-     * {@link PCollection} will be {@link IsBounded#BOUNDED bounded}.
-     */
-    public UnboundedCountingInput withMaxNumRecords(long maxRecords) {
-      checkArgument(
-          maxRecords > 0, "MaxRecords must be a positive (nonzero) value. Got %s", maxRecords);
-      return new UnboundedCountingInput(
-          timestampFn, elementsPerPeriod, period, Optional.of(maxRecords), maxReadTime);
-    }
-
-    /**
-     * Returns an {@link UnboundedCountingInput} like this one, but with output production limited
-     * to an aggregate rate of no more than the number of elements per the period length.
-     *
-     * <p>Note that when there are multiple splits, each split outputs independently. This may lead
-     * to elements not being produced evenly across time, though the aggregate rate will still
-     * approach the specified rate.
-     *
-     * <p>A duration of {@link Duration#ZERO} will produce output as fast as possible.
-     */
-    public UnboundedCountingInput withRate(long numElements, Duration periodLength) {
-      return new UnboundedCountingInput(
-          timestampFn, numElements, periodLength, maxNumRecords, maxReadTime);
-    }
-
-    /**
-     * Returns an {@link UnboundedCountingInput} like this one, but that will read for at most the
-     * specified amount of time.
-     *
-     * <p>A bounded amount of elements will be produced by the result transform, and the result
-     * {@link PCollection} will be {@link IsBounded#BOUNDED bounded}.
-     */
-    public UnboundedCountingInput withMaxReadTime(Duration readTime) {
-      checkNotNull(readTime, "ReadTime cannot be null");
-      return new UnboundedCountingInput(
-          timestampFn, elementsPerPeriod, period, maxNumRecords, Optional.of(readTime));
-    }
-
-    @SuppressWarnings("deprecation")
-    @Override
-    public PCollection<Long> expand(PBegin begin) {
-      Unbounded<Long> read =
-          Read.from(
-              CountingSource.createUnboundedFrom(0)
-                  .withTimestampFn(timestampFn)
-                  .withRate(elementsPerPeriod, period));
-      if (!maxNumRecords.isPresent() && !maxReadTime.isPresent()) {
-        return begin.apply(read);
-      } else if (maxNumRecords.isPresent() && !maxReadTime.isPresent()) {
-        return begin.apply(read.withMaxNumRecords(maxNumRecords.get()));
-      } else if (!maxNumRecords.isPresent() && maxReadTime.isPresent()) {
-        return begin.apply(read.withMaxReadTime(maxReadTime.get()));
-      } else {
-        return begin.apply(
-            read.withMaxReadTime(maxReadTime.get()).withMaxNumRecords(maxNumRecords.get()));
-      }
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      super.populateDisplayData(builder);
-
-      builder.add(DisplayData.item("timestampFn", timestampFn.getClass())
-        .withLabel("Timestamp Function"));
-
-      if (maxReadTime.isPresent()) {
-        builder.add(DisplayData.item("maxReadTime", maxReadTime.get())
-          .withLabel("Maximum Read Time"));
-      }
-
-      if (maxNumRecords.isPresent()) {
-        builder.add(DisplayData.item("maxRecords", maxNumRecords.get())
-          .withLabel("Maximum Read Records"));
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/57eeaae1/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
deleted file mode 100644
index e7a6cfd..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io;
-
-import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
-
-import org.apache.beam.sdk.io.CountingInput.UnboundedCountingInput;
-import org.apache.beam.sdk.testing.NeedsRunner;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.testing.ValidatesRunner;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.Distinct;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Max;
-import org.apache.beam.sdk.transforms.Min;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.values.PCollection;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link CountingInput}.
- */
-@RunWith(JUnit4.class)
-public class CountingInputTest {
-  public static void addCountingAsserts(PCollection<Long> input, long start, long end) {
-    // Count == numElements
-    PAssert.thatSingleton(input.apply("Count", Count.<Long>globally()))
-        .isEqualTo(end - start);
-    // Unique count == numElements
-    PAssert.thatSingleton(
-            input
-                .apply(Distinct.<Long>create())
-                .apply("UniqueCount", Count.<Long>globally()))
-        .isEqualTo(end - start);
-    // Min == start
-    PAssert.thatSingleton(input.apply("Min", Min.<Long>globally())).isEqualTo(start);
-    // Max == end-1
-    PAssert.thatSingleton(input.apply("Max", Max.<Long>globally()))
-        .isEqualTo(end - 1);
-  }
-
-  @Rule
-  public TestPipeline p = TestPipeline.create();
-
-  @Test
-  @Category(ValidatesRunner.class)
-  public void testBoundedInput() {
-    long numElements = 1000;
-    PCollection<Long> input = p.apply(CountingInput.upTo(numElements));
-
-    addCountingAsserts(input, 0, numElements);
-    p.run();
-  }
-
-  @Test
-  @Category(ValidatesRunner.class)
-  public void testEmptyBoundedInput() {
-    PCollection<Long> input = p.apply(CountingInput.upTo(0));
-
-    PAssert.that(input).empty();
-    p.run();
-  }
-
-  @Test
-  @Category(ValidatesRunner.class)
-  public void testEmptyBoundedInputSubrange() {
-    PCollection<Long> input = p.apply(CountingInput.forSubrange(42, 42));
-
-    PAssert.that(input).empty();
-    p.run();
-  }
-
-
-  @Test
-  @Category(ValidatesRunner.class)
-  public void testBoundedInputSubrange() {
-    long start = 10;
-    long end = 1000;
-    PCollection<Long> input = p.apply(CountingInput.forSubrange(start, end));
-
-    addCountingAsserts(input, start, end);
-    p.run();
-  }
-
-  @Test
-  public void testBoundedDisplayData() {
-    PTransform<?, ?> input = CountingInput.upTo(1234);
-    DisplayData displayData = DisplayData.from(input);
-    assertThat(displayData, hasDisplayItem("upTo", 1234));
-  }
-
-  @Test
-  public void testBoundedDisplayDataSubrange() {
-    PTransform<?, ?> input = CountingInput.forSubrange(12, 1234);
-    DisplayData displayData = DisplayData.from(input);
-    assertThat(displayData, hasDisplayItem("startAt", 12));
-    assertThat(displayData, hasDisplayItem("upTo", 1234));
-  }
-
-  @Test
-  @Category(ValidatesRunner.class)
-  public void testUnboundedInput() {
-    long numElements = 1000;
-
-    PCollection<Long> input = p.apply(CountingInput.unbounded().withMaxNumRecords(numElements));
-
-    addCountingAsserts(input, 0, numElements);
-    p.run();
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void testUnboundedInputRate() {
-    long numElements = 5000;
-
-    long elemsPerPeriod = 10L;
-    Duration periodLength = Duration.millis(8);
-    PCollection<Long> input =
-        p.apply(
-            CountingInput.unbounded()
-                .withRate(elemsPerPeriod, periodLength)
-                .withMaxNumRecords(numElements));
-
-    addCountingAsserts(input, 0, numElements);
-    long expectedRuntimeMillis = (periodLength.getMillis() * numElements) / elemsPerPeriod;
-    Instant startTime = Instant.now();
-    p.run();
-    Instant endTime = Instant.now();
-    assertThat(endTime.isAfter(startTime.plus(expectedRuntimeMillis)), is(true));
-  }
-
-  private static class ElementValueDiff extends DoFn<Long, Long> {
-    @ProcessElement
-    public void processElement(ProcessContext c) throws Exception {
-      c.output(c.element() - c.timestamp().getMillis());
-    }
-  }
-
-  @Test
-  @Category(ValidatesRunner.class)
-  public void testUnboundedInputTimestamps() {
-    long numElements = 1000;
-
-    PCollection<Long> input =
-        p.apply(
-            CountingInput.unbounded()
-                .withTimestampFn(new ValueAsTimestampFn())
-                .withMaxNumRecords(numElements));
-    addCountingAsserts(input, 0, numElements);
-
-    PCollection<Long> diffs =
-        input
-            .apply("TimestampDiff", ParDo.of(new ElementValueDiff()))
-            .apply("DistinctTimestamps", Distinct.<Long>create());
-    // This assert also confirms that diffs only has one unique value.
-    PAssert.thatSingleton(diffs).isEqualTo(0L);
-
-    p.run();
-  }
-
-  @Test
-  public void testUnboundedDisplayData() {
-    Duration maxReadTime = Duration.standardHours(5);
-    SerializableFunction<Long, Instant> timestampFn = new SerializableFunction<Long, Instant>() {
-      @Override
-      public Instant apply(Long input) {
-        return Instant.now();
-      }
-    };
-
-    PTransform<?, ?> input = CountingInput.unbounded()
-        .withMaxNumRecords(1234)
-        .withMaxReadTime(maxReadTime)
-        .withTimestampFn(timestampFn);
-
-    DisplayData displayData = DisplayData.from(input);
-
-    assertThat(displayData, hasDisplayItem("maxRecords", 1234));
-    assertThat(displayData, hasDisplayItem("maxReadTime", maxReadTime));
-    assertThat(displayData, hasDisplayItem("timestampFn", timestampFn.getClass()));
-  }
-
-  /**
-   * A timestamp function that uses the given value as the timestamp. Because the input values will
-   * not wrap, this function is non-decreasing and meets the timestamp function criteria laid out
-   * in {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)}.
-   */
-  private static class ValueAsTimestampFn implements SerializableFunction<Long, Instant> {
-    @Override
-    public Instant apply(Long input) {
-      return new Instant(input);
-    }
-  }
-}