You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2016/08/11 18:41:17 UTC
[1/3] incubator-beam git commit: Remove Counter and associated code
Repository: incubator-beam
Updated Branches:
refs/heads/master 3a858ee9e -> a0769ad2a
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d20a7ada/sdks/java/core/src/test/java/org/apache/beam/sdk/util/common/CounterSetTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/common/CounterSetTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/common/CounterSetTest.java
deleted file mode 100644
index bce0a88..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/common/CounterSetTest.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.common;
-
-import static org.apache.beam.sdk.util.common.Counter.AggregationKind.MAX;
-import static org.apache.beam.sdk.util.common.Counter.AggregationKind.SUM;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Unit tests for {@link CounterSet}.
- */
-@RunWith(JUnit4.class)
-public class CounterSetTest {
- private CounterSet set;
-
- @Rule
- public ExpectedException thrown = ExpectedException.none();
-
- @Before
- public void setup() {
- set = new CounterSet();
- }
-
- @Test
- public void testAddWithDifferentNamesAddsAll() {
- Counter<?> c1 = Counter.longs("c1", SUM);
- Counter<?> c2 = Counter.ints("c2", MAX);
-
- boolean c1Add = set.add(c1);
- boolean c2Add = set.add(c2);
-
- assertTrue(c1Add);
- assertTrue(c2Add);
- assertThat(set, containsInAnyOrder(c1, c2));
- }
-
- @Test
- public void testAddWithAlreadyPresentNameReturnsFalse() {
- Counter<?> c1 = Counter.longs("c1", SUM);
- Counter<?> c1Dup = Counter.longs("c1", SUM);
-
- boolean c1Add = set.add(c1);
- boolean c1DupAdd = set.add(c1Dup);
-
- assertTrue(c1Add);
- assertFalse(c1DupAdd);
- assertThat(set, containsInAnyOrder((Counter) c1));
- }
-
- @Test
- public void testAddOrReuseWithAlreadyPresentReturnsPresent() {
- Counter<?> c1 = Counter.longs("c1", SUM);
- Counter<?> c1Dup = Counter.longs("c1", SUM);
-
- Counter<?> c1AddResult = set.addOrReuseCounter(c1);
- Counter<?> c1DupAddResult = set.addOrReuseCounter(c1Dup);
-
- assertSame(c1, c1AddResult);
- assertSame(c1AddResult, c1DupAddResult);
- assertThat(set, containsInAnyOrder((Counter) c1));
- }
-
- @Test
- public void testAddOrReuseWithNoCounterReturnsProvided() {
- Counter<?> c1 = Counter.longs("c1", SUM);
-
- Counter<?> c1AddResult = set.addOrReuseCounter(c1);
-
- assertSame(c1, c1AddResult);
- assertThat(set, containsInAnyOrder((Counter) c1));
- }
-
- @Test
- public void testAddOrReuseWithIncompatibleTypesThrowsException() {
- Counter<?> c1 = Counter.longs("c1", SUM);
- Counter<?> c1Incompatible = Counter.ints("c1", MAX);
-
- set.addOrReuseCounter(c1);
-
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("Counter " + c1Incompatible
- + " duplicates incompatible counter " + c1 + " in " + set);
-
- set.addOrReuseCounter(c1Incompatible);
- }
-
- @Test
- public void testMergeWithDifferentNamesAddsAll() {
- Counter<?> c1 = Counter.longs("c1", SUM);
- Counter<?> c2 = Counter.ints("c2", MAX);
-
- set.add(c1);
- set.add(c2);
-
- CounterSet newSet = new CounterSet();
- newSet.merge(set);
-
- assertThat(newSet, containsInAnyOrder(c1, c2));
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testMergeWithSameNamesMerges() {
- Counter<Long> c1 = Counter.longs("c1", SUM);
- Counter<Integer> c2 = Counter.ints("c2", MAX);
-
- set.add(c1);
- set.add(c2);
-
- c1.addValue(3L);
- c2.addValue(22);
-
- CounterSet newSet = new CounterSet();
- Counter<Long> c1Prime = Counter.longs("c1", SUM);
- Counter<Integer> c2Prime = Counter.ints("c2", MAX);
-
- c1Prime.addValue(7L);
- c2Prime.addValue(14);
-
- newSet.add(c1Prime);
- newSet.add(c2Prime);
-
- newSet.merge(set);
-
- assertThat((Counter<Long>) newSet.getExistingCounter("c1"), equalTo(c1Prime));
- assertThat((Long) newSet.getExistingCounter("c1").getAggregate(), equalTo(10L));
-
- assertThat((Counter<Integer>) newSet.getExistingCounter("c2"), equalTo(c2Prime));
- assertThat((Integer) newSet.getExistingCounter("c2").getAggregate(), equalTo(22));
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testMergeWithIncompatibleTypesThrowsException() {
- Counter<Long> c1 = Counter.longs("c1", SUM);
-
- set.add(c1);
-
- CounterSet newSet = new CounterSet();
- Counter<Long> c1Prime = Counter.longs("c1", MAX);
-
- newSet.add(c1Prime);
-
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("c1");
- thrown.expectMessage("incompatible counters with the same name");
-
- newSet.merge(set);
- }
-
- @Test
- public void testAddCounterMutatorAddCounterAddsCounter() {
- Counter<?> c1 = Counter.longs("c1", SUM);
-
- Counter<?> addC1Result = set.getAddCounterMutator().addCounter(c1);
-
- assertSame(c1, addC1Result);
- assertThat(set, containsInAnyOrder((Counter) c1));
- }
-
- @Test
- public void testAddCounterMutatorAddEqualCounterReusesCounter() {
- Counter<?> c1 = Counter.longs("c1", SUM);
- Counter<?> c1dup = Counter.longs("c1", SUM);
-
- Counter<?> addC1Result = set.getAddCounterMutator().addCounter(c1);
- Counter<?> addC1DupResult = set.getAddCounterMutator().addCounter(c1dup);
-
- assertThat(set, containsInAnyOrder((Counter) c1));
- assertSame(c1, addC1Result);
- assertSame(c1, addC1DupResult);
- }
-
- @Test
- public void testAddCounterMutatorIncompatibleTypesThrowsException() {
- Counter<?> c1 = Counter.longs("c1", SUM);
- Counter<?> c1Incompatible = Counter.longs("c1", MAX);
-
- set.getAddCounterMutator().addCounter(c1);
-
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("Counter " + c1Incompatible
- + " duplicates incompatible counter " + c1 + " in " + set);
-
- set.getAddCounterMutator().addCounter(c1Incompatible);
- }
-
- @Test
- public void testAddCounterMutatorAddMultipleCounters() {
- Counter<?> c1 = Counter.longs("c1", SUM);
- Counter<?> c2 = Counter.longs("c2", MAX);
-
- set.getAddCounterMutator().addCounter(c1);
- set.getAddCounterMutator().addCounter(c2);
-
- assertThat(set, containsInAnyOrder(c1, c2));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d20a7ada/sdks/java/core/src/test/java/org/apache/beam/sdk/util/common/CounterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/common/CounterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/common/CounterTest.java
deleted file mode 100644
index 79f0cb7..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/common/CounterTest.java
+++ /dev/null
@@ -1,736 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.common;
-
-import static org.apache.beam.sdk.util.Values.asDouble;
-import static org.apache.beam.sdk.util.common.Counter.AggregationKind.AND;
-import static org.apache.beam.sdk.util.common.Counter.AggregationKind.MAX;
-import static org.apache.beam.sdk.util.common.Counter.AggregationKind.MEAN;
-import static org.apache.beam.sdk.util.common.Counter.AggregationKind.MIN;
-import static org.apache.beam.sdk.util.common.Counter.AggregationKind.OR;
-import static org.apache.beam.sdk.util.common.Counter.AggregationKind.SUM;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.beam.sdk.util.common.Counter.CommitState;
-import org.apache.beam.sdk.util.common.Counter.CounterMean;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * Unit tests for the {@link Counter} API.
- */
-@RunWith(JUnit4.class)
-public class CounterTest {
-
- @Rule
- public ExpectedException thrown = ExpectedException.none();
-
- private static void flush(Counter<?> c) {
- switch (c.getKind()) {
- case SUM:
- case MAX:
- case MIN:
- case AND:
- case OR:
- c.getAndResetDelta();
- break;
- case MEAN:
- c.getAndResetMeanDelta();
- break;
- default:
- throw new IllegalArgumentException("Unknown counter kind " + c.getKind());
- }
- }
-
- private static final double EPSILON = 0.00000000001;
-
- @Test
- public void testCompatibility() {
- // Equal counters are compatible, of all kinds.
- assertTrue(
- Counter.longs("c", SUM).isCompatibleWith(Counter.longs("c", SUM)));
- assertTrue(
- Counter.ints("c", SUM).isCompatibleWith(Counter.ints("c", SUM)));
- assertTrue(
- Counter.doubles("c", SUM).isCompatibleWith(Counter.doubles("c", SUM)));
- assertTrue(
- Counter.booleans("c", OR).isCompatibleWith(
- Counter.booleans("c", OR)));
-
- // The name, kind, and type of the counter must match.
- assertFalse(
- Counter.longs("c", SUM).isCompatibleWith(Counter.longs("c2", SUM)));
- assertFalse(
- Counter.longs("c", SUM).isCompatibleWith(Counter.longs("c", MAX)));
- assertFalse(
- Counter.longs("c", SUM).isCompatibleWith(Counter.ints("c", SUM)));
-
- // The value of the counters are ignored.
- assertTrue(
- Counter.longs("c", SUM).resetToValue(666L).isCompatibleWith(
- Counter.longs("c", SUM).resetToValue(42L)));
- }
-
-
- private void assertOK(long total, long delta, Counter<Long> c) {
- assertEquals(total, c.getAggregate().longValue());
- assertEquals(delta, c.getAndResetDelta().longValue());
- }
-
- private void assertOK(double total, double delta, Counter<Double> c) {
- assertEquals(total, asDouble(c.getAggregate()), EPSILON);
- assertEquals(delta, asDouble(c.getAndResetDelta()), EPSILON);
- }
-
-
- // Tests for SUM.
-
- @Test
- public void testSumLong() {
- Counter<Long> c = Counter.longs("sum-long", SUM);
- long expectedTotal = 0;
- long expectedDelta = 0;
- assertOK(expectedTotal, expectedDelta, c);
-
- c.addValue(13L).addValue(42L).addValue(0L);
- expectedTotal += 55;
- expectedDelta += 55;
- assertOK(expectedTotal, expectedDelta, c);
-
- c.resetToValue(120L).addValue(17L).addValue(37L);
- expectedTotal = expectedDelta = 174;
- assertOK(expectedTotal, expectedDelta, c);
-
- flush(c);
- expectedDelta = 0;
- assertOK(expectedTotal, expectedDelta, c);
-
- c.addValue(15L).addValue(42L);
- expectedTotal += 57;
- expectedDelta += 57;
- assertOK(expectedTotal, expectedDelta, c);
-
- c.resetToValue(100L).addValue(17L).addValue(49L);
- expectedTotal = expectedDelta = 166;
- assertOK(expectedTotal, expectedDelta, c);
-
- Counter<Long> other = Counter.longs("sum-long", SUM);
- other.addValue(12L);
- expectedDelta = 12L;
- expectedTotal += 12L;
- c.merge(other);
- assertOK(expectedTotal, expectedDelta, c);
- }
-
- @Test
- public void testSumDouble() {
- Counter<Double> c = Counter.doubles("sum-double", SUM);
- double expectedTotal = 0.0;
- double expectedDelta = 0.0;
- assertOK(expectedTotal, expectedDelta, c);
-
- c.addValue(Math.E).addValue(Math.PI).addValue(0.0);
- expectedTotal += Math.E + Math.PI;
- expectedDelta += Math.E + Math.PI;
- assertOK(expectedTotal, expectedDelta, c);
-
- c.resetToValue(Math.sqrt(2)).addValue(2 * Math.PI).addValue(3 * Math.E);
- expectedTotal = expectedDelta = Math.sqrt(2) + 2 * Math.PI + 3 * Math.E;
- assertOK(expectedTotal, expectedDelta, c);
-
- flush(c);
- expectedDelta = 0.0;
- assertOK(expectedTotal, expectedDelta, c);
-
- c.addValue(7 * Math.PI).addValue(5 * Math.E);
- expectedTotal += 7 * Math.PI + 5 * Math.E;
- expectedDelta += 7 * Math.PI + 5 * Math.E;
- assertOK(expectedTotal, expectedDelta, c);
-
- c.resetToValue(Math.sqrt(17)).addValue(17.0).addValue(49.0);
- expectedTotal = expectedDelta = Math.sqrt(17.0) + 17.0 + 49.0;
- assertOK(expectedTotal, expectedDelta, c);
-
- Counter<Double> other = Counter.doubles("sum-double", SUM);
- other.addValue(12 * Math.PI);
- expectedDelta = 12 * Math.PI;
- expectedTotal += 12 * Math.PI;
- c.merge(other);
- assertOK(expectedTotal, expectedDelta, c);
- }
-
-
- // Tests for MAX.
-
- @Test
- public void testMaxLong() {
- Counter<Long> c = Counter.longs("max-long", MAX);
- long expectedTotal = Long.MIN_VALUE;
- long expectedDelta = Long.MIN_VALUE;
- assertOK(expectedTotal, expectedDelta, c);
-
- c.addValue(13L).addValue(42L).addValue(Long.MIN_VALUE);
- expectedTotal = expectedDelta = 42;
- assertOK(expectedTotal, expectedDelta, c);
-
- c.resetToValue(120L).addValue(17L).addValue(37L);
- expectedTotal = expectedDelta = 120;
- assertOK(expectedTotal, expectedDelta, c);
-
- flush(c);
- expectedDelta = Long.MIN_VALUE;
- assertOK(expectedTotal, expectedDelta, c);
-
- c.addValue(42L).addValue(15L);
- expectedDelta = 42;
- assertOK(expectedTotal, expectedDelta, c);
-
- c.resetToValue(100L).addValue(171L).addValue(49L);
- expectedTotal = expectedDelta = 171;
- assertOK(expectedTotal, expectedDelta, c);
-
- Counter<Long> other = Counter.longs("max-long", MAX);
- other.addValue(12L);
- expectedDelta = 12L;
- c.merge(other);
- assertOK(expectedTotal, expectedDelta, c);
- }
-
- @Test
- public void testMaxDouble() {
- Counter<Double> c = Counter.doubles("max-double", MAX);
- double expectedTotal = Double.NEGATIVE_INFINITY;
- double expectedDelta = Double.NEGATIVE_INFINITY;
- assertOK(expectedTotal, expectedDelta, c);
-
- c.addValue(Math.E).addValue(Math.PI).addValue(Double.NEGATIVE_INFINITY);
- expectedTotal = expectedDelta = Math.PI;
- assertOK(expectedTotal, expectedDelta, c);
-
- c.resetToValue(Math.sqrt(12345)).addValue(2 * Math.PI).addValue(3 * Math.E);
- expectedTotal = expectedDelta = Math.sqrt(12345);
- assertOK(expectedTotal, expectedDelta, c);
-
- flush(c);
- expectedDelta = Double.NEGATIVE_INFINITY;
- assertOK(expectedTotal, expectedDelta, c);
-
- c.addValue(7 * Math.PI).addValue(5 * Math.E);
- expectedDelta = 7 * Math.PI;
- assertOK(expectedTotal, expectedDelta, c);
-
- c.resetToValue(Math.sqrt(17)).addValue(171.0).addValue(49.0);
- expectedTotal = expectedDelta = 171.0;
- assertOK(expectedTotal, expectedDelta, c);
-
- Counter<Double> other = Counter.doubles("max-double", MAX);
- other.addValue(12 * Math.PI);
- expectedDelta = 12 * Math.PI;
- c.merge(other);
- assertOK(expectedTotal, expectedDelta, c);
- }
-
-
- // Tests for MIN.
-
- @Test
- public void testMinLong() {
- Counter<Long> c = Counter.longs("min-long", MIN);
- long expectedTotal = Long.MAX_VALUE;
- long expectedDelta = Long.MAX_VALUE;
- assertOK(expectedTotal, expectedDelta, c);
-
- c.addValue(13L).addValue(42L).addValue(Long.MAX_VALUE);
- expectedTotal = expectedDelta = 13;
- assertOK(expectedTotal, expectedDelta, c);
-
- c.resetToValue(120L).addValue(17L).addValue(37L);
- expectedTotal = expectedDelta = 17;
- assertOK(expectedTotal, expectedDelta, c);
-
- flush(c);
- expectedDelta = Long.MAX_VALUE;
- assertOK(expectedTotal, expectedDelta, c);
-
- c.addValue(42L).addValue(18L);
- expectedDelta = 18;
- assertOK(expectedTotal, expectedDelta, c);
-
- c.resetToValue(100L).addValue(171L).addValue(49L);
- expectedTotal = expectedDelta = 49;
- assertOK(expectedTotal, expectedDelta, c);
-
- Counter<Long> other = Counter.longs("min-long", MIN);
- other.addValue(42L);
- expectedTotal = expectedDelta = 42L;
- c.merge(other);
- assertOK(expectedTotal, expectedDelta, c);
- }
-
- @Test
- public void testMinDouble() {
- Counter<Double> c = Counter.doubles("min-double", MIN);
- double expectedTotal = Double.POSITIVE_INFINITY;
- double expectedDelta = Double.POSITIVE_INFINITY;
- assertOK(expectedTotal, expectedDelta, c);
-
- c.addValue(Math.E).addValue(Math.PI).addValue(Double.POSITIVE_INFINITY);
- expectedTotal = expectedDelta = Math.E;
- assertOK(expectedTotal, expectedDelta, c);
-
- c.resetToValue(Math.sqrt(12345)).addValue(2 * Math.PI).addValue(3 * Math.E);
- expectedTotal = expectedDelta = 2 * Math.PI;
- assertOK(expectedTotal, expectedDelta, c);
-
- flush(c);
- expectedDelta = Double.POSITIVE_INFINITY;
- assertOK(expectedTotal, expectedDelta, c);
-
- c.addValue(7 * Math.PI).addValue(5 * Math.E);
- expectedDelta = 5 * Math.E;
- assertOK(expectedTotal, expectedDelta, c);
-
- c.resetToValue(Math.sqrt(17)).addValue(171.0).addValue(0.0);
- expectedTotal = expectedDelta = 0.0;
- assertOK(expectedTotal, expectedDelta, c);
-
- Counter<Double> other = Counter.doubles("min-double", MIN);
- other.addValue(42 * Math.E);
- expectedDelta = 42 * Math.E;
- c.merge(other);
- assertOK(expectedTotal, expectedDelta, c);
- }
-
-
- // Tests for MEAN.
-
- private void assertMean(long s, long sd, long c, long cd, Counter<Long> cn) {
- CounterMean<Long> mean = cn.getMean();
- CounterMean<Long> deltaMean = cn.getAndResetMeanDelta();
- assertEquals(s, mean.getAggregate().longValue());
- assertEquals(sd, deltaMean.getAggregate().longValue());
- assertEquals(c, mean.getCount());
- assertEquals(cd, deltaMean.getCount());
- }
-
- private void assertMean(double s, double sd, long c, long cd,
- Counter<Double> cn) {
- CounterMean<Double> mean = cn.getMean();
- CounterMean<Double> deltaMean = cn.getAndResetMeanDelta();
- assertEquals(s, mean.getAggregate().doubleValue(), EPSILON);
- assertEquals(sd, deltaMean.getAggregate().doubleValue(), EPSILON);
- assertEquals(c, mean.getCount());
- assertEquals(cd, deltaMean.getCount());
- }
-
- @Test
- public void testMeanLong() {
- Counter<Long> c = Counter.longs("mean-long", MEAN);
- long expTotal = 0;
- long expDelta = 0;
- long expCountTotal = 0;
- long expCountDelta = 0;
- assertMean(expTotal, expDelta, expCountTotal, expCountDelta, c);
-
- c.addValue(13L).addValue(42L).addValue(0L);
- expTotal += 55;
- expDelta += 55;
- expCountTotal += 3;
- expCountDelta += 3;
- assertMean(expTotal, expDelta, expCountTotal, expCountDelta, c);
-
- c.resetMeanToValue(1L, 120L).addValue(17L).addValue(37L);
- expTotal = expDelta = 174;
- assertMean(expTotal, expDelta, expCountTotal, expCountDelta, c);
-
- flush(c);
- expDelta = 0;
- expCountDelta = 0;
- assertMean(expTotal, expDelta, expCountTotal, expCountDelta, c);
-
- c.addValue(15L).addValue(42L);
- expTotal += 57;
- expDelta += 57;
- expCountTotal += 2;
- expCountDelta += 2;
- assertMean(expTotal, expDelta, expCountTotal, expCountDelta, c);
-
- c.resetMeanToValue(3L, 100L).addValue(17L).addValue(49L);
- expTotal = expDelta = 166;
- expCountTotal = expCountDelta = 5;
- assertMean(expTotal, expDelta, expCountTotal, expCountDelta, c);
-
- Counter<Long> other = Counter.longs("mean-long", MEAN);
- other.addValue(12L).addValue(44L).addValue(-5L);
- expTotal += 12L + 44L - 5L;
- expDelta += 12L + 44L - 5L;
- expCountTotal += 3;
- expCountDelta += 3;
- c.merge(other);
- assertMean(expTotal, expDelta, expCountTotal, expCountDelta, c);
- }
-
- @Test
- public void testMeanDouble() {
- Counter<Double> c = Counter.doubles("mean-double", MEAN);
- double expTotal = 0.0;
- double expDelta = 0.0;
- long expCountTotal = 0;
- long expCountDelta = 0;
- assertMean(expTotal, expDelta, expCountTotal, expCountDelta, c);
-
- c.addValue(Math.E).addValue(Math.PI).addValue(0.0);
- expTotal += Math.E + Math.PI;
- expDelta += Math.E + Math.PI;
- expCountTotal += 3;
- expCountDelta += 3;
- assertMean(expTotal, expDelta, expCountTotal, expCountDelta, c);
-
- c.resetMeanToValue(1L, Math.sqrt(2)).addValue(2 * Math.PI)
- .addValue(3 * Math.E);
- expTotal = expDelta = Math.sqrt(2) + 2 * Math.PI + 3 * Math.E;
- assertMean(expTotal, expDelta, expCountTotal, expCountDelta, c);
-
- flush(c);
- expDelta = 0.0;
- expCountDelta = 0;
- assertMean(expTotal, expDelta, expCountTotal, expCountDelta, c);
-
- c.addValue(7 * Math.PI).addValue(5 * Math.E);
- expTotal += 7 * Math.PI + 5 * Math.E;
- expDelta += 7 * Math.PI + 5 * Math.E;
- expCountTotal += 2;
- expCountDelta += 2;
- assertMean(expTotal, expDelta, expCountTotal, expCountDelta, c);
-
- c.resetMeanToValue(3L, Math.sqrt(17)).addValue(17.0).addValue(49.0);
- expTotal = expDelta = Math.sqrt(17.0) + 17.0 + 49.0;
- expCountTotal = expCountDelta = 5;
- assertMean(expTotal, expDelta, expCountTotal, expCountDelta, c);
-
- Counter<Double> other = Counter.doubles("mean-double", MEAN);
- other.addValue(3 * Math.PI).addValue(12 * Math.E);
- expTotal += 3 * Math.PI + 12 * Math.E;
- expDelta += 3 * Math.PI + 12 * Math.E;
- expCountTotal += 2;
- expCountDelta += 2;
- c.merge(other);
- assertMean(expTotal, expDelta, expCountTotal, expCountDelta, c);
- }
-
-
- // Test for AND and OR.
- private void assertBool(boolean total, boolean delta, Counter<Boolean> c) {
- assertEquals(total, c.getAggregate().booleanValue());
- assertEquals(delta, c.getAndResetDelta().booleanValue());
- }
-
- @Test
- public void testBoolAnd() {
- Counter<Boolean> c = Counter.booleans("bool-and", AND);
- boolean expectedTotal = true;
- boolean expectedDelta = true;
- assertBool(expectedTotal, expectedDelta, c);
-
- c.addValue(true);
- assertBool(expectedTotal, expectedDelta, c);
-
- c.addValue(false);
- expectedTotal = expectedDelta = false;
- assertBool(expectedTotal, expectedDelta, c);
-
- c.resetToValue(true).addValue(true);
- expectedTotal = expectedDelta = true;
- assertBool(expectedTotal, expectedDelta, c);
-
- c.addValue(false);
- expectedTotal = expectedDelta = false;
- assertBool(expectedTotal, expectedDelta, c);
-
- flush(c);
- expectedDelta = true;
- assertBool(expectedTotal, expectedDelta, c);
-
- c.addValue(false);
- expectedDelta = false;
- assertBool(expectedTotal, expectedDelta, c);
- }
-
- @Test
- public void testBoolOr() {
- Counter<Boolean> c = Counter.booleans("bool-or", OR);
- boolean expectedTotal = false;
- boolean expectedDelta = false;
- assertBool(expectedTotal, expectedDelta, c);
-
- c.addValue(false);
- assertBool(expectedTotal, expectedDelta, c);
-
- c.addValue(true);
- expectedTotal = expectedDelta = true;
- assertBool(expectedTotal, expectedDelta, c);
-
- c.resetToValue(false).addValue(false);
- expectedTotal = expectedDelta = false;
- assertBool(expectedTotal, expectedDelta, c);
-
- c.addValue(true);
- expectedTotal = expectedDelta = true;
- assertBool(expectedTotal, expectedDelta, c);
-
- flush(c);
- expectedDelta = false;
- assertBool(expectedTotal, expectedDelta, c);
-
- c.addValue(true);
- expectedDelta = true;
- assertBool(expectedTotal, expectedDelta, c);
- }
-
- // Incompatibility tests.
-
- @Test(expected = IllegalArgumentException.class)
- public void testSumBool() {
- Counter.booleans("counter", SUM);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testMinBool() {
- Counter.booleans("counter", MIN);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testMaxBool() {
- Counter.booleans("counter", MAX);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testMeanBool() {
- Counter.booleans("counter", MEAN);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testAndLong() {
- Counter.longs("counter", AND);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testAndDouble() {
- Counter.doubles("counter", AND);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testOrLong() {
- Counter.longs("counter", OR);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testOrDouble() {
- Counter.doubles("counter", OR);
- }
-
- @Test
- public void testMergeIncompatibleCounters() {
- Counter<Long> longSums = Counter.longs("longsums", SUM);
- Counter<Long> longMean = Counter.longs("longmean", MEAN);
- Counter<Long> longMin = Counter.longs("longmin", MIN);
-
- Counter<Long> otherLongSums = Counter.longs("othersums", SUM);
- Counter<Long> otherLongMean = Counter.longs("otherlongmean", MEAN);
-
- Counter<Double> doubleSums = Counter.doubles("doublesums", SUM);
- Counter<Double> doubleMean = Counter.doubles("doublemean", MEAN);
-
- Counter<Boolean> boolAnd = Counter.booleans("and", AND);
- Counter<Boolean> boolOr = Counter.booleans("or", OR);
-
- List<Counter<Long>> longCounters =
- Arrays.asList(longSums, longMean, longMin, otherLongSums, otherLongMean);
- for (Counter<Long> left : longCounters) {
- for (Counter<Long> right : longCounters) {
- if (left != right) {
- assertIncompatibleMerge(left, right);
- }
- }
- }
-
- assertIncompatibleMerge(doubleSums, doubleMean);
- assertIncompatibleMerge(boolAnd, boolOr);
- }
-
- private <T> void assertIncompatibleMerge(Counter<T> left, Counter<T> right) {
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("Counters");
- thrown.expectMessage("are incompatible");
- left.merge(right);
- }
-
- @Test
- public void testDirtyBit() {
- Counter<Long> longSum = Counter.longs("long-sum", SUM);
- Counter<Long> longMean = Counter.longs("long-mean", MEAN);
- Counter<Double> doubleSum = Counter.doubles("double-sum", SUM);
- Counter<Double> doubleMean = Counter.doubles("double-sum", MEAN);
- Counter<Integer> intSum = Counter.ints("int-sum", SUM);
- Counter<Integer> intMean = Counter.ints("int-sum", MEAN);
- Counter<Boolean> boolAnd = Counter.booleans("and", AND);
-
- // Test counters are not dirty and are COMMITTED initially.
- assertFalse(longSum.isDirty());
- assertFalse(longMean.isDirty());
- assertFalse(doubleSum.isDirty());
- assertFalse(doubleMean.isDirty());
- assertFalse(intSum.isDirty());
- assertFalse(intMean.isDirty());
- assertFalse(boolAnd.isDirty());
-
- assertEquals(CommitState.COMMITTED, longSum.commitState.get());
- assertEquals(CommitState.COMMITTED, longMean.commitState.get());
- assertEquals(CommitState.COMMITTED, doubleSum.commitState.get());
- assertEquals(CommitState.COMMITTED, doubleMean.commitState.get());
- assertEquals(CommitState.COMMITTED, intSum.commitState.get());
- assertEquals(CommitState.COMMITTED, intMean.commitState.get());
- assertEquals(CommitState.COMMITTED, boolAnd.commitState.get());
-
- // Test counters are dirty after mutating.
- longSum.addValue(1L);
- longMean.resetMeanToValue(1L, 1L);
- doubleSum.addValue(1.0);
- doubleMean.resetMeanToValue(1L, 1.0);
- intSum.addValue(1);
- intMean.resetMeanToValue(1, 1);
- boolAnd.addValue(true);
-
- assertTrue(longSum.isDirty());
- assertTrue(longMean.isDirty());
- assertTrue(doubleSum.isDirty());
- assertTrue(doubleMean.isDirty());
- assertTrue(intSum.isDirty());
- assertTrue(intMean.isDirty());
- assertTrue(boolAnd.isDirty());
-
- assertEquals(CommitState.DIRTY, longSum.commitState.get());
- assertEquals(CommitState.DIRTY, longMean.commitState.get());
- assertEquals(CommitState.DIRTY, doubleSum.commitState.get());
- assertEquals(CommitState.DIRTY, doubleMean.commitState.get());
- assertEquals(CommitState.DIRTY, intSum.commitState.get());
- assertEquals(CommitState.DIRTY, intMean.commitState.get());
- assertEquals(CommitState.DIRTY, boolAnd.commitState.get());
-
- // Test counters are dirty and are COMMITTING.
- assertTrue(longSum.committing());
- assertTrue(longMean.committing());
- assertTrue(doubleSum.committing());
- assertTrue(doubleMean.committing());
- assertTrue(intSum.committing());
- assertTrue(intMean.committing());
- assertTrue(boolAnd.committing());
-
- assertTrue(longSum.isDirty());
- assertTrue(longMean.isDirty());
- assertTrue(doubleSum.isDirty());
- assertTrue(doubleMean.isDirty());
- assertTrue(intSum.isDirty());
- assertTrue(intMean.isDirty());
- assertTrue(boolAnd.isDirty());
-
- assertEquals(CommitState.COMMITTING, longSum.commitState.get());
- assertEquals(CommitState.COMMITTING, longMean.commitState.get());
- assertEquals(CommitState.COMMITTING, doubleSum.commitState.get());
- assertEquals(CommitState.COMMITTING, doubleMean.commitState.get());
- assertEquals(CommitState.COMMITTING, intSum.commitState.get());
- assertEquals(CommitState.COMMITTING, intMean.commitState.get());
- assertEquals(CommitState.COMMITTING, boolAnd.commitState.get());
-
- // Test counters are dirty again after mutating.
- longSum.addValue(1L);
- longMean.resetMeanToValue(1L, 1L);
- doubleSum.addValue(1.0);
- doubleMean.resetMeanToValue(1L, 1.0);
- intSum.addValue(1);
- intMean.resetMeanToValue(1, 1);
- boolAnd.addValue(true);
-
- assertFalse(longSum.committed());
- assertFalse(longMean.committed());
- assertFalse(doubleSum.committed());
- assertFalse(doubleMean.committed());
- assertFalse(intSum.committed());
- assertFalse(intMean.committed());
- assertFalse(boolAnd.committed());
-
- assertTrue(longSum.isDirty());
- assertTrue(longMean.isDirty());
- assertTrue(doubleSum.isDirty());
- assertTrue(doubleMean.isDirty());
- assertTrue(intSum.isDirty());
- assertTrue(intMean.isDirty());
- assertTrue(boolAnd.isDirty());
-
- assertEquals(CommitState.DIRTY, longSum.commitState.get());
- assertEquals(CommitState.DIRTY, longMean.commitState.get());
- assertEquals(CommitState.DIRTY, doubleSum.commitState.get());
- assertEquals(CommitState.DIRTY, doubleMean.commitState.get());
- assertEquals(CommitState.DIRTY, intSum.commitState.get());
- assertEquals(CommitState.DIRTY, intMean.commitState.get());
- assertEquals(CommitState.DIRTY, boolAnd.commitState.get());
-
- // Test counters are not dirty and are COMMITTED.
- assertTrue(longSum.committing());
- assertTrue(longMean.committing());
- assertTrue(doubleSum.committing());
- assertTrue(doubleMean.committing());
- assertTrue(intSum.committing());
- assertTrue(intMean.committing());
- assertTrue(boolAnd.committing());
-
- assertTrue(longSum.committed());
- assertTrue(longMean.committed());
- assertTrue(doubleSum.committed());
- assertTrue(doubleMean.committed());
- assertTrue(intSum.committed());
- assertTrue(intMean.committed());
- assertTrue(boolAnd.committed());
-
- assertFalse(longSum.isDirty());
- assertFalse(longMean.isDirty());
- assertFalse(doubleSum.isDirty());
- assertFalse(doubleMean.isDirty());
- assertFalse(intSum.isDirty());
- assertFalse(intMean.isDirty());
- assertFalse(boolAnd.isDirty());
-
- assertEquals(CommitState.COMMITTED, longSum.commitState.get());
- assertEquals(CommitState.COMMITTED, longMean.commitState.get());
- assertEquals(CommitState.COMMITTED, doubleSum.commitState.get());
- assertEquals(CommitState.COMMITTED, doubleMean.commitState.get());
- assertEquals(CommitState.COMMITTED, intSum.commitState.get());
- assertEquals(CommitState.COMMITTED, intMean.commitState.get());
- assertEquals(CommitState.COMMITTED, boolAnd.commitState.get());
- }
-}
[2/3] incubator-beam git commit: Remove Counter and associated code
Posted by bc...@apache.org.
Remove Counter and associated code
Aggregator is the model level concept. Counter was specific to the
Dataflow Runner, and is now not needed as part of Beam.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d20a7ada
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d20a7ada
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d20a7ada
Branch: refs/heads/master
Commit: d20a7ada7eb3ee714917e7c334e1b15ecc2c3b03
Parents: 2a1055d
Author: bchambers <bc...@google.com>
Authored: Fri Jul 29 09:41:17 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Thu Aug 11 10:26:04 2016 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/util/DoFnRunners.java | 78 --
.../beam/runners/dataflow/DataflowRunner.java | 4 +-
.../org/apache/beam/sdk/transforms/Combine.java | 13 -
.../org/apache/beam/sdk/transforms/Max.java | 27 +-
.../org/apache/beam/sdk/transforms/Min.java | 28 +-
.../org/apache/beam/sdk/transforms/Sum.java | 27 +-
.../apache/beam/sdk/util/CounterAggregator.java | 128 --
.../apache/beam/sdk/util/common/Counter.java | 1287 ------------------
.../beam/sdk/util/common/CounterName.java | 153 ---
.../beam/sdk/util/common/CounterProvider.java | 27 -
.../apache/beam/sdk/util/common/CounterSet.java | 179 ---
.../util/common/ElementByteSizeObserver.java | 24 +-
.../beam/sdk/util/CounterAggregatorTest.java | 256 ----
.../beam/sdk/util/common/CounterSetTest.java | 227 ---
.../beam/sdk/util/common/CounterTest.java | 736 ----------
15 files changed, 15 insertions(+), 3179 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d20a7ada/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunners.java
index a9f3cf4..6089228 100644
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunners.java
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunners.java
@@ -23,8 +23,6 @@ import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.DoFnRunner.ReduceFnExecutor;
import org.apache.beam.sdk.util.ExecutionContext.StepContext;
-import org.apache.beam.sdk.util.common.CounterSet;
-import org.apache.beam.sdk.util.common.CounterSet.AddCounterMutator;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
@@ -72,33 +70,6 @@ public class DoFnRunners {
}
/**
- * Returns a basic implementation of {@link DoFnRunner} that works for most
- * {@link OldDoFn OldDoFns}.
- *
- * <p>It invokes {@link OldDoFn#processElement} for each input.
- */
- public static <InputT, OutputT> DoFnRunner<InputT, OutputT> simpleRunner(
- PipelineOptions options,
- OldDoFn<InputT, OutputT> fn,
- SideInputReader sideInputReader,
- OutputManager outputManager,
- TupleTag<OutputT> mainOutputTag,
- List<TupleTag<?>> sideOutputTags,
- StepContext stepContext,
- CounterSet.AddCounterMutator addCounterMutator,
- WindowingStrategy<?, ?> windowingStrategy) {
- return simpleRunner(options,
- fn,
- sideInputReader,
- outputManager,
- mainOutputTag,
- sideOutputTags,
- stepContext,
- CounterAggregator.factoryFor(addCounterMutator),
- windowingStrategy);
- }
-
- /**
* Returns an implementation of {@link DoFnRunner} that handles late data dropping.
*
* <p>It drops elements from expired windows before they reach the underlying {@link OldDoFn}.
@@ -132,33 +103,6 @@ public class DoFnRunners {
reduceFnExecutor.getDroppedDueToLatenessAggregator());
}
- /**
- * Returns an implementation of {@link DoFnRunner} that handles late data dropping.
- *
- * <p>It drops elements from expired windows before they reach the underlying {@link OldDoFn}.
- */
- public static <K, InputT, OutputT, W extends BoundedWindow>
- DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> lateDataDroppingRunner(
- PipelineOptions options,
- ReduceFnExecutor<K, InputT, OutputT, W> reduceFnExecutor,
- SideInputReader sideInputReader,
- OutputManager outputManager,
- TupleTag<KV<K, OutputT>> mainOutputTag,
- List<TupleTag<?>> sideOutputTags,
- StepContext stepContext,
- CounterSet.AddCounterMutator addCounterMutator,
- WindowingStrategy<?, W> windowingStrategy) {
- return lateDataDroppingRunner(
- options,
- reduceFnExecutor,
- sideInputReader,
- outputManager,
- mainOutputTag,
- sideOutputTags,
- stepContext,
- CounterAggregator.factoryFor(addCounterMutator),
- windowingStrategy);
- }
public static <InputT, OutputT> DoFnRunner<InputT, OutputT> createDefault(
PipelineOptions options,
@@ -197,26 +141,4 @@ public class DoFnRunners {
aggregatorFactory,
windowingStrategy);
}
-
- public static <InputT, OutputT> DoFnRunner<InputT, OutputT> createDefault(
- PipelineOptions options,
- OldDoFn<InputT, OutputT> doFn,
- SideInputReader sideInputReader,
- OutputManager outputManager,
- TupleTag<OutputT> mainOutputTag,
- List<TupleTag<?>> sideOutputTags,
- StepContext stepContext,
- AddCounterMutator addCounterMutator,
- WindowingStrategy<?, ?> windowingStrategy) {
- return createDefault(
- options,
- doFn,
- sideInputReader,
- outputManager,
- mainOutputTag,
- sideOutputTags,
- stepContext,
- CounterAggregator.factoryFor(addCounterMutator),
- windowingStrategy);
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d20a7ada/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index bea6264..667a63b 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -212,9 +212,9 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
// Default Docker container images that execute Dataflow worker harness, residing in Google
// Container Registry, separately for Batch and Streaming.
public static final String BATCH_WORKER_HARNESS_CONTAINER_IMAGE =
- "dataflow.gcr.io/v1beta3/beam-java-batch:beam-master-20160804-dofn";
+ "dataflow.gcr.io/v1beta3/beam-java-batch:beam-master-20160810";
public static final String STREAMING_WORKER_HARNESS_CONTAINER_IMAGE =
- "dataflow.gcr.io/v1beta3/beam-java-streaming:beam-master-20160804-dofn";
+ "dataflow.gcr.io/v1beta3/beam-java-streaming:beam-master-20160810";
// The limit of CreateJob request size.
private static final int CREATE_JOB_REQUEST_LIMIT_BYTES = 10 * 1024 * 1024;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d20a7ada/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index a825800..6ba3f8a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -49,7 +49,6 @@ import org.apache.beam.sdk.util.PerKeyCombineFnRunners;
import org.apache.beam.sdk.util.PropertyNames;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.common.Counter;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
@@ -736,10 +735,6 @@ public class Combine {
return new int[] { value };
}
- public Counter<Integer> getCounter(@SuppressWarnings("unused") String name) {
- throw new UnsupportedOperationException("BinaryCombineIntegerFn does not support getCounter");
- }
-
private static final class ToIntegerCodingFunction
implements DelegateCoder.CodingFunction<int[], Integer> {
@Override
@@ -839,10 +834,6 @@ public class Combine {
return new long[] { value };
}
- public Counter<Long> getCounter(@SuppressWarnings("unused") String name) {
- throw new UnsupportedOperationException("BinaryCombineLongFn does not support getCounter");
- }
-
private static final class ToLongCodingFunction
implements DelegateCoder.CodingFunction<long[], Long> {
@Override
@@ -944,10 +935,6 @@ public class Combine {
return new double[] { value };
}
- public Counter<Double> getCounter(@SuppressWarnings("unused") String name) {
- throw new UnsupportedOperationException("BinaryCombineDoubleFn does not support getCounter");
- }
-
private static final class ToDoubleCodingFunction
implements DelegateCoder.CodingFunction<double[], Double> {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d20a7ada/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
index 52617b6..eed13fb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
@@ -19,9 +19,6 @@ package org.apache.beam.sdk.transforms;
import org.apache.beam.sdk.transforms.Combine.BinaryCombineFn;
import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.util.common.Counter;
-import org.apache.beam.sdk.util.common.Counter.AggregationKind;
-import org.apache.beam.sdk.util.common.CounterProvider;
import java.io.Serializable;
import java.util.Comparator;
@@ -218,8 +215,7 @@ public class Max {
* A {@code CombineFn} that computes the maximum of a collection of {@code Integer}s, useful as an
* argument to {@link Combine#globally} or {@link Combine#perKey}.
*/
- public static class MaxIntegerFn extends Combine.BinaryCombineIntegerFn
- implements CounterProvider<Integer> {
+ public static class MaxIntegerFn extends Combine.BinaryCombineIntegerFn {
@Override
public int apply(int left, int right) {
return left >= right ? left : right;
@@ -229,19 +225,13 @@ public class Max {
public int identity() {
return Integer.MIN_VALUE;
}
-
- @Override
- public Counter<Integer> getCounter(String name) {
- return Counter.ints(name, AggregationKind.MAX);
- }
}
/**
* A {@code CombineFn} that computes the maximum of a collection of {@code Long}s, useful as an
* argument to {@link Combine#globally} or {@link Combine#perKey}.
*/
- public static class MaxLongFn extends Combine.BinaryCombineLongFn
- implements CounterProvider<Long> {
+ public static class MaxLongFn extends Combine.BinaryCombineLongFn {
@Override
public long apply(long left, long right) {
return left >= right ? left : right;
@@ -251,19 +241,13 @@ public class Max {
public long identity() {
return Long.MIN_VALUE;
}
-
- @Override
- public Counter<Long> getCounter(String name) {
- return Counter.longs(name, AggregationKind.MAX);
- }
}
/**
* A {@code CombineFn} that computes the maximum of a collection of {@code Double}s, useful as an
* argument to {@link Combine#globally} or {@link Combine#perKey}.
*/
- public static class MaxDoubleFn extends Combine.BinaryCombineDoubleFn
- implements CounterProvider<Double> {
+ public static class MaxDoubleFn extends Combine.BinaryCombineDoubleFn {
@Override
public double apply(double left, double right) {
return left >= right ? left : right;
@@ -273,10 +257,5 @@ public class Max {
public double identity() {
return Double.NEGATIVE_INFINITY;
}
-
- @Override
- public Counter<Double> getCounter(String name) {
- return Counter.doubles(name, AggregationKind.MAX);
- }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d20a7ada/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
index 3159134..9c9d14f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
@@ -19,9 +19,6 @@ package org.apache.beam.sdk.transforms;
import org.apache.beam.sdk.transforms.Combine.BinaryCombineFn;
import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.util.common.Counter;
-import org.apache.beam.sdk.util.common.Counter.AggregationKind;
-import org.apache.beam.sdk.util.common.CounterProvider;
import java.io.Serializable;
import java.util.Comparator;
@@ -218,8 +215,7 @@ public class Min {
* A {@code CombineFn} that computes the minimum of a collection of {@code Integer}s, useful as an
* argument to {@link Combine#globally} or {@link Combine#perKey}.
*/
- public static class MinIntegerFn extends Combine.BinaryCombineIntegerFn
- implements CounterProvider<Integer> {
+ public static class MinIntegerFn extends Combine.BinaryCombineIntegerFn {
@Override
public int apply(int left, int right) {
@@ -230,20 +226,13 @@ public class Min {
public int identity() {
return Integer.MAX_VALUE;
}
-
- @Override
- public Counter<Integer> getCounter(String name) {
- return Counter.ints(name, AggregationKind.MIN);
- }
}
/**
* A {@code CombineFn} that computes the minimum of a collection of {@code Long}s, useful as an
* argument to {@link Combine#globally} or {@link Combine#perKey}.
*/
- public static class MinLongFn extends Combine.BinaryCombineLongFn
- implements CounterProvider<Long> {
-
+ public static class MinLongFn extends Combine.BinaryCombineLongFn {
@Override
public long apply(long left, long right) {
return left <= right ? left : right;
@@ -253,19 +242,13 @@ public class Min {
public long identity() {
return Long.MAX_VALUE;
}
-
- @Override
- public Counter<Long> getCounter(String name) {
- return Counter.longs(name, AggregationKind.MIN);
- }
}
/**
* A {@code CombineFn} that computes the minimum of a collection of {@code Double}s, useful as an
* argument to {@link Combine#globally} or {@link Combine#perKey}.
*/
- public static class MinDoubleFn extends Combine.BinaryCombineDoubleFn
- implements CounterProvider<Double> {
+ public static class MinDoubleFn extends Combine.BinaryCombineDoubleFn {
@Override
public double apply(double left, double right) {
@@ -276,10 +259,5 @@ public class Min {
public double identity() {
return Double.POSITIVE_INFINITY;
}
-
- @Override
- public Counter<Double> getCounter(String name) {
- return Counter.doubles(name, AggregationKind.MIN);
- }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d20a7ada/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java
index 07f78b5..27c5ced 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java
@@ -17,10 +17,6 @@
*/
package org.apache.beam.sdk.transforms;
-import org.apache.beam.sdk.util.common.Counter;
-import org.apache.beam.sdk.util.common.Counter.AggregationKind;
-import org.apache.beam.sdk.util.common.CounterProvider;
-
/**
* {@code PTransform}s for computing the sum of the elements in a
* {@code PCollection}, or the sum of the values associated with
@@ -123,8 +119,7 @@ public class Sum {
* {@code Iterable} of {@code Integer}s, useful as an argument to
* {@link Combine#globally} or {@link Combine#perKey}.
*/
- public static class SumIntegerFn
- extends Combine.BinaryCombineIntegerFn implements CounterProvider<Integer> {
+ public static class SumIntegerFn extends Combine.BinaryCombineIntegerFn {
@Override
public int apply(int a, int b) {
return a + b;
@@ -134,11 +129,6 @@ public class Sum {
public int identity() {
return 0;
}
-
- @Override
- public Counter<Integer> getCounter(String name) {
- return Counter.ints(name, AggregationKind.SUM);
- }
}
/**
@@ -147,7 +137,7 @@ public class Sum {
* {@link Combine#globally} or {@link Combine#perKey}.
*/
public static class SumLongFn
- extends Combine.BinaryCombineLongFn implements CounterProvider<Long> {
+ extends Combine.BinaryCombineLongFn {
@Override
public long apply(long a, long b) {
return a + b;
@@ -157,11 +147,6 @@ public class Sum {
public long identity() {
return 0;
}
-
- @Override
- public Counter<Long> getCounter(String name) {
- return Counter.longs(name, AggregationKind.SUM);
- }
}
/**
@@ -169,8 +154,7 @@ public class Sum {
* {@code Iterable} of {@code Double}s, useful as an argument to
* {@link Combine#globally} or {@link Combine#perKey}.
*/
- public static class SumDoubleFn
- extends Combine.BinaryCombineDoubleFn implements CounterProvider<Double> {
+ public static class SumDoubleFn extends Combine.BinaryCombineDoubleFn {
@Override
public double apply(double a, double b) {
return a + b;
@@ -180,10 +164,5 @@ public class Sum {
public double identity() {
return 0;
}
-
- @Override
- public Counter<Double> getCounter(String name) {
- return Counter.doubles(name, AggregationKind.SUM);
- }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d20a7ada/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CounterAggregator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CounterAggregator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CounterAggregator.java
deleted file mode 100644
index 5bde8ef..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CounterAggregator.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Max;
-import org.apache.beam.sdk.transforms.Min;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.util.common.Counter;
-import org.apache.beam.sdk.util.common.CounterProvider;
-import org.apache.beam.sdk.util.common.CounterSet;
-import org.apache.beam.sdk.util.common.CounterSet.AddCounterMutator;
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * An implementation of the {@code Aggregator} interface that uses a
- * {@link Counter} as the underlying representation. Supports {@link CombineFn}s
- * from the {@link Sum}, {@link Min} and {@link Max} classes.
- *
- * @param <InputT> the type of input values
- * @param <AccumT> the type of accumulator values
- * @param <OutputT> the type of output value
- */
-public class CounterAggregator<InputT, AccumT, OutputT>
- implements Aggregator<InputT, OutputT> {
-
- private static class CounterAggregatorFactory implements AggregatorFactory {
- private final AddCounterMutator addCounterMutator;
-
- private CounterAggregatorFactory(CounterSet.AddCounterMutator addCounterMutator) {
- this.addCounterMutator = addCounterMutator;
- }
-
- @Override
- public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
- Class<?> fnClass, ExecutionContext.StepContext stepContext,
- String userName, CombineFn<InputT, AccumT, OutputT> combine) {
- boolean isSystem = fnClass.isAnnotationPresent(SystemDoFnInternal.class);
- String mangledName = (isSystem ? "" : "user-") + stepContext.getStepName() + "-" + userName;
-
- return new CounterAggregator<>(mangledName, combine, addCounterMutator);
- }
- }
-
- private final Counter<InputT> counter;
- private final CombineFn<InputT, AccumT, OutputT> combiner;
-
- /**
- * Create a factory for producing {@link CounterAggregator CounterAggregators} backed by the given
- * {@link CounterSet.AddCounterMutator}.
- */
- public static AggregatorFactory factoryFor(
- CounterSet.AddCounterMutator addCounterMutator) {
- return new CounterAggregatorFactory(addCounterMutator);
- }
-
- /**
- * Constructs a new aggregator with the given name and aggregation logic
- * specified in the CombineFn argument. The underlying counter is
- * automatically added into the provided CounterSet.
- *
- * <p>If a counter with the same name already exists, it will be reused, as
- * long as it has the same type.
- */
- @VisibleForTesting CounterAggregator(
- String name, CombineFn<? super InputT, AccumT, OutputT> combiner,
- CounterSet.AddCounterMutator addCounterMutator) {
- // Safe contravariant cast
- this(constructCounter(name, combiner), addCounterMutator,
- (CombineFn<InputT, AccumT, OutputT>) combiner);
- }
-
- private CounterAggregator(Counter<InputT> counter,
- CounterSet.AddCounterMutator addCounterMutator,
- CombineFn<InputT, AccumT, OutputT> combiner) {
- try {
- this.counter = addCounterMutator.addCounter(counter);
- } catch (IllegalArgumentException ex) {
- throw new IllegalArgumentException(
- "aggregator's name collides with an existing aggregator "
- + "or system-provided counter of an incompatible type");
- }
- this.combiner = combiner;
- }
-
- private static <T> Counter<T> constructCounter(String name,
- CombineFn<? super T, ?, ?> combiner) {
- if (combiner instanceof CounterProvider) {
- @SuppressWarnings("unchecked")
- CounterProvider<T> counterProvider = (CounterProvider<T>) combiner;
- return counterProvider.getCounter(name);
- } else {
- throw new IllegalArgumentException("unsupported combiner in Aggregator: "
- + combiner.getClass().getName());
- }
- }
-
- @Override
- public void addValue(InputT value) {
- counter.addValue(value);
- }
-
- @Override
- public String getName() {
- return counter.getFlatName();
- }
-
- @Override
- public CombineFn<InputT, ?, OutputT> getCombineFn() {
- return combiner;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d20a7ada/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java
deleted file mode 100644
index 550c648..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java
+++ /dev/null
@@ -1,1287 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.common;
-
-import static org.apache.beam.sdk.util.common.Counter.AggregationKind.AND;
-import static org.apache.beam.sdk.util.common.Counter.AggregationKind.MEAN;
-import static org.apache.beam.sdk.util.common.Counter.AggregationKind.OR;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import org.apache.beam.sdk.values.TypeDescriptor;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.AtomicDouble;
-
-import java.util.Objects;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.annotation.Nullable;
-
-/**
- * A Counter enables the aggregation of a stream of values over time. The
- * cumulative aggregate value is updated as new values are added, or it can be
- * reset to a new value. Multiple kinds of aggregation are supported depending
- * on the type of the counter.
- *
- * <p>Counters compare using value equality of their name, kind, and
- * cumulative value. Equal counters should have equal toString()s.
- *
- * <p>After all possible mutations have completed, the reader should check
- * {@link #isDirty} for every counter, otherwise updates may be lost.
- *
- * <p>A counter may become dirty without a corresponding update to the value.
- * This generally will occur when the calls to {@code addValue()}, {@code committing()},
- * and {@code committed()} are interleaved such that the value is updated
- * between the calls to committing and the read of the value.
- *
- * @param <T> the type of values aggregated by this counter
- */
-public abstract class Counter<T> {
- /**
- * Possible kinds of counter aggregation.
- */
- public static enum AggregationKind {
-
- /**
- * Computes the sum of all added values.
- * Applicable to {@link Integer}, {@link Long}, and {@link Double} values.
- */
- SUM,
-
- /**
- * Computes the maximum value of all added values.
- * Applicable to {@link Integer}, {@link Long}, and {@link Double} values.
- */
- MAX,
-
- /**
- * Computes the minimum value of all added values.
- * Applicable to {@link Integer}, {@link Long}, and {@link Double} values.
- */
- MIN,
-
- /**
- * Computes the arithmetic mean of all added values. Applicable to
- * {@link Integer}, {@link Long}, and {@link Double} values.
- */
- MEAN,
-
- /**
- * Computes boolean AND over all added values.
- * Applicable only to {@link Boolean} values.
- */
- AND,
-
- /**
- * Computes boolean OR over all added values. Applicable only to
- * {@link Boolean} values.
- */
- OR
- // TODO: consider adding VECTOR_SUM, HISTOGRAM, KV_SET, PRODUCT, TOP.
- }
-
- /**
- * Constructs a new {@link Counter} that aggregates {@link Integer}, values
- * according to the desired aggregation kind. The supported aggregation kinds
- * are {@link AggregationKind#SUM}, {@link AggregationKind#MIN},
- * {@link AggregationKind#MAX}, and {@link AggregationKind#MEAN}.
- * This is a convenience wrapper over a
- * {@link Counter} implementation that aggregates {@link Long} values. This is
- * useful when the application handles (boxed) {@link Integer} values that
- * are not readily convertible to the (boxed) {@link Long} values otherwise
- * expected by the {@link Counter} implementation aggregating {@link Long}
- * values.
- *
- * @param name the name of the new counter
- * @param kind the new counter's aggregation kind
- * @return the newly constructed Counter
- * @throws IllegalArgumentException if the aggregation kind is not supported
- */
- public static Counter<Integer> ints(CounterName name, AggregationKind kind) {
- return new IntegerCounter(name, kind);
- }
-
- /**
- * Constructs a new {@code Counter<Integer>} with an unstructured name.
- *
- * @deprecated use {@link #ints(CounterName, AggregationKind)}.
- */
- @Deprecated
- public static Counter<Integer> ints(String name, AggregationKind kind) {
- return new IntegerCounter(CounterName.named(name), kind);
- }
-
- /**
- * Constructs a new {@link Counter} that aggregates {@link Long} values
- * according to the desired aggregation kind. The supported aggregation kinds
- * are {@link AggregationKind#SUM}, {@link AggregationKind#MIN},
- * {@link AggregationKind#MAX}, and {@link AggregationKind#MEAN}.
- *
- * @param name the name of the new counter
- * @param kind the new counter's aggregation kind
- * @return the newly constructed Counter
- * @throws IllegalArgumentException if the aggregation kind is not supported
- */
- public static Counter<Long> longs(CounterName name, AggregationKind kind) {
- return new LongCounter(name, kind);
- }
-
- /**
- * Constructs a new {@code Counter<Long>} with an unstructured name.
- *
- * @deprecated use {@link #longs(CounterName, AggregationKind)}.
- */
- @Deprecated
- public static Counter<Long> longs(String name, AggregationKind kind) {
- return new LongCounter(CounterName.named(name), kind);
- }
-
- /**
- * Constructs a new {@link Counter} that aggregates {@link Double} values
- * according to the desired aggregation kind. The supported aggregation kinds
- * are {@link AggregationKind#SUM}, {@link AggregationKind#MIN},
- * {@link AggregationKind#MAX}, and {@link AggregationKind#MEAN}.
- *
- * @param name the name of the new counter
- * @param kind the new counter's aggregation kind
- * @return the newly constructed Counter
- * @throws IllegalArgumentException if the aggregation kind is not supported
- */
- public static Counter<Double> doubles(CounterName name, AggregationKind kind) {
- return new DoubleCounter(name, kind);
- }
-
- /**
- * Constructs a new {@code Counter<Double>} with an unstructured name.
- *
- * @deprecated use {@link #doubles(CounterName, AggregationKind)}.
- */
- @Deprecated
- public static Counter<Double> doubles(String name, AggregationKind kind) {
- return new DoubleCounter(CounterName.named(name), kind);
- }
-
- /**
- * Constructs a new {@link Counter} that aggregates {@link Boolean} values
- * according to the desired aggregation kind. The only supported aggregation
- * kinds are {@link AggregationKind#AND} and {@link AggregationKind#OR}.
- *
- * @param name the name of the new counter
- * @param kind the new counter's aggregation kind
- * @return the newly constructed Counter
- * @throws IllegalArgumentException if the aggregation kind is not supported
- */
- public static Counter<Boolean> booleans(CounterName name, AggregationKind kind) {
- return new BooleanCounter(name, kind);
- }
-
- /**
- * Constructs a new {@code Counter<Boolean>} with an unstructured name.
- *
- * @deprecated use {@link #booleans(CounterName, AggregationKind)}.
- */
- @Deprecated
- public static Counter<Boolean> booleans(String name, AggregationKind kind) {
- return new BooleanCounter(CounterName.named(name), kind);
- }
-
- //////////////////////////////////////////////////////////////////////////////
-
- /**
- * Adds a new value to the aggregation stream. Returns this (to allow method
- * chaining).
- */
- public abstract Counter<T> addValue(T value);
-
- /**
- * Resets the aggregation stream to this new value. This aggregator must not
- * be a MEAN aggregator. Returns this (to allow method chaining).
- */
- public abstract Counter<T> resetToValue(T value);
-
- /**
- * Resets the aggregation stream to this new value. Returns this (to allow
- * method chaining). The value of elementCount must be non-negative, and this
- * aggregator must be a MEAN aggregator.
- */
- public abstract Counter<T> resetMeanToValue(long elementCount, T value);
-
- /**
- * Resets the counter's delta value to have no values accumulated and returns
- * the value of the delta prior to the reset.
- *
- * @return the aggregate delta at the time this method is called
- */
- public abstract T getAndResetDelta();
-
- /**
- * Resets the counter's delta value to have no values accumulated and returns
- * the value of the delta prior to the reset, for a MEAN counter.
- *
- * @return the mean delta t the time this method is called
- */
- public abstract CounterMean<T> getAndResetMeanDelta();
-
- /**
- * Returns the counter's flat name.
- */
- public String getFlatName() {
- return name.getFlatName();
- }
-
- /**
- * Returns the counter's name.
- *
- * @deprecated use {@link #getFlatName}.
- */
- @Deprecated
- public String getName() {
- return name.getFlatName();
- }
-
- /**
- * Returns the counter's aggregation kind.
- */
- public AggregationKind getKind() {
- return kind;
- }
-
- /**
- * Returns the counter's type.
- */
- public Class<?> getType() {
- return new TypeDescriptor<T>(getClass()) {}.getRawType();
- }
-
- /**
- * Returns the aggregated value, or the sum for MEAN aggregation, either
- * total or, if delta, since the last update extraction or resetDelta.
- */
- public abstract T getAggregate();
-
- /**
- * The mean value of a {@code Counter}, represented as an aggregate value and
- * a count.
- *
- * @param <T> the type of the aggregate
- */
- public static interface CounterMean<T> {
- /**
- * Gets the aggregate value of this {@code CounterMean}.
- */
- T getAggregate();
-
- /**
- * Gets the count of this {@code CounterMean}.
- */
- long getCount();
- }
-
- /**
- * Returns the mean in the form of a CounterMean, or null if this is not a
- * MEAN counter.
- */
- @Nullable
- public abstract CounterMean<T> getMean();
-
- /**
- * Represents whether counters' values have been committed to the backend.
- *
- * <p>Runners can use this information to optimize counters updates.
- * For example, if counters are committed, runners may choose to skip the updates.
- *
- * <p>Counters' state transition table:
- * {@code
- * Action\Current State COMMITTED DIRTY COMMITTING
- * addValue() DIRTY DIRTY DIRTY
- * committing() None COMMITTING None
- * committed() None None COMMITTED
- * }
- */
- @VisibleForTesting
- enum CommitState {
- /**
- * There are no local updates that haven't been committed to the backend.
- */
- COMMITTED,
- /**
- * There are local updates that haven't been committed to the backend.
- */
- DIRTY,
- /**
- * Local updates are committing to the backend, but are still pending.
- */
- COMMITTING,
- }
-
- /**
- * Returns if the counter contains non-committed aggregate.
- */
- public boolean isDirty() {
- return commitState.get() != CommitState.COMMITTED;
- }
-
- /**
- * Changes the counter from {@code CommitState.DIRTY} to {@code CommitState.COMMITTING}.
- *
- * @return true if successful. False return indicates that the commit state
- * was not in {@code CommitState.DIRTY}.
- */
- public boolean committing() {
- return commitState.compareAndSet(CommitState.DIRTY, CommitState.COMMITTING);
- }
-
- /**
- * Changes the counter from {@code CommitState.COMMITTING} to {@code CommitState.COMMITTED}.
- *
- * @return true if successful.
- *
- * <p>False return indicates that the counter was updated while the committing is pending.
- * That counter update might or might not has been committed. The {@code commitState} has to
- * stay in {@code CommitState.DIRTY}.
- */
- public boolean committed() {
- return commitState.compareAndSet(CommitState.COMMITTING, CommitState.COMMITTED);
- }
-
- /**
- * Sets the counter to {@code CommitState.DIRTY}.
- *
- * <p>Must be called at the end of {@link #addValue}, {@link #resetToValue},
- * {@link #resetMeanToValue}, and {@link #merge}.
- */
- protected void setDirty() {
- commitState.set(CommitState.DIRTY);
- }
-
- /**
- * Returns a string representation of the Counter. Useful for debugging logs.
- * Example return value: "ElementCount:SUM(15)".
- */
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append(getFlatName());
- sb.append(":");
- sb.append(getKind());
- sb.append("(");
- switch (kind) {
- case SUM:
- case MAX:
- case MIN:
- case AND:
- case OR:
- sb.append(getAggregate());
- break;
- case MEAN:
- sb.append(getMean());
- break;
- default:
- throw illegalArgumentException();
- }
- sb.append(")");
-
- return sb.toString();
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- } else if (o instanceof Counter) {
- Counter<?> that = (Counter<?>) o;
- if (this.name.equals(that.name) && this.kind == that.kind
- && this.getClass().equals(that.getClass())) {
- if (kind == MEAN) {
- CounterMean<T> thisMean = this.getMean();
- CounterMean<?> thatMean = that.getMean();
- return thisMean == thatMean
- || (Objects.equals(thisMean.getAggregate(), thatMean.getAggregate())
- && thisMean.getCount() == thatMean.getCount());
- } else {
- return Objects.equals(this.getAggregate(), that.getAggregate());
- }
- }
- }
- return false;
- }
-
- @Override
- public int hashCode() {
- if (kind == MEAN) {
- CounterMean<T> mean = getMean();
- return Objects.hash(getClass(), name, kind, mean.getAggregate(), mean.getCount());
- } else {
- return Objects.hash(getClass(), name, kind, getAggregate());
- }
- }
-
- /**
- * Returns whether this Counter is compatible with that Counter. If
- * so, they can be merged into a single Counter.
- */
- public boolean isCompatibleWith(Counter<?> that) {
- return this.name.equals(that.name)
- && this.kind == that.kind
- && this.getClass().equals(that.getClass());
- }
-
- /**
- * Merges this counter with the provided counter, returning this counter with the combined value
- * of both counters. This may reset the delta of this counter.
- *
- * @throws IllegalArgumentException if the provided Counter is not compatible with this Counter
- */
- public abstract Counter<T> merge(Counter<T> that);
-
- //////////////////////////////////////////////////////////////////////////////
-
- /** The name and metadata of this counter. */
- protected final CounterName name;
-
- /** The kind of aggregation function to apply to this counter. */
- protected final AggregationKind kind;
-
- /** The commit state of this counter. */
- protected final AtomicReference<CommitState> commitState;
-
- protected Counter(CounterName name, AggregationKind kind) {
- this.name = name;
- this.kind = kind;
- this.commitState = new AtomicReference<>(CommitState.COMMITTED);
- }
-
- //////////////////////////////////////////////////////////////////////////////
-
- /**
- * Implements a {@link Counter} for {@link Long} values.
- */
- private static class LongCounter extends Counter<Long> {
- private final AtomicLong aggregate;
- private final AtomicLong deltaAggregate;
- private final AtomicReference<LongCounterMean> mean;
- private final AtomicReference<LongCounterMean> deltaMean;
-
- /** Initializes a new {@link Counter} for {@link Long} values. */
- private LongCounter(CounterName name, AggregationKind kind) {
- super(name, kind);
- switch (kind) {
- case MEAN:
- mean = new AtomicReference<>();
- deltaMean = new AtomicReference<>();
- getAndResetMeanDelta();
- mean.set(deltaMean.get());
- aggregate = deltaAggregate = null;
- break;
- case SUM:
- case MAX:
- case MIN:
- aggregate = new AtomicLong();
- deltaAggregate = new AtomicLong();
- getAndResetDelta();
- aggregate.set(deltaAggregate.get());
- mean = deltaMean = null;
- break;
- default:
- throw illegalArgumentException();
- }
- }
-
- @Override
- public LongCounter addValue(Long value) {
- try {
- switch (kind) {
- case SUM:
- aggregate.addAndGet(value);
- deltaAggregate.addAndGet(value);
- break;
- case MEAN:
- addToMeanAndSet(value, mean);
- addToMeanAndSet(value, deltaMean);
- break;
- case MAX:
- maxAndSet(value, aggregate);
- maxAndSet(value, deltaAggregate);
- break;
- case MIN:
- minAndSet(value, aggregate);
- minAndSet(value, deltaAggregate);
- break;
- default:
- throw illegalArgumentException();
- }
- return this;
- } finally {
- setDirty();
- }
- }
-
- private void minAndSet(Long value, AtomicLong target) {
- long current;
- long update;
- do {
- current = target.get();
- update = Math.min(value, current);
- } while (update < current && !target.compareAndSet(current, update));
- }
-
- private void maxAndSet(Long value, AtomicLong target) {
- long current;
- long update;
- do {
- current = target.get();
- update = Math.max(value, current);
- } while (update > current && !target.compareAndSet(current, update));
- }
-
- private void addToMeanAndSet(Long value, AtomicReference<LongCounterMean> target) {
- LongCounterMean current;
- LongCounterMean update;
- do {
- current = target.get();
- update = new LongCounterMean(current.getAggregate() + value, current.getCount() + 1L);
- } while (!target.compareAndSet(current, update));
- }
-
- @Override
- public Long getAggregate() {
- if (kind != MEAN) {
- return aggregate.get();
- } else {
- return getMean().getAggregate();
- }
- }
-
- @Override
- public Long getAndResetDelta() {
- switch (kind) {
- case SUM:
- return deltaAggregate.getAndSet(0L);
- case MAX:
- return deltaAggregate.getAndSet(Long.MIN_VALUE);
- case MIN:
- return deltaAggregate.getAndSet(Long.MAX_VALUE);
- default:
- throw illegalArgumentException();
- }
- }
-
- @Override
- public Counter<Long> resetToValue(Long value) {
- try {
- if (kind == MEAN) {
- throw illegalArgumentException();
- }
- aggregate.set(value);
- deltaAggregate.set(value);
- return this;
- } finally {
- setDirty();
- }
- }
-
- @Override
- public Counter<Long> resetMeanToValue(long elementCount, Long value) {
- try {
- if (kind != MEAN) {
- throw illegalArgumentException();
- }
- if (elementCount < 0) {
- throw new IllegalArgumentException("elementCount must be non-negative");
- }
- LongCounterMean counterMean = new LongCounterMean(value, elementCount);
- mean.set(counterMean);
- deltaMean.set(counterMean);
- return this;
- } finally {
- setDirty();
- }
- }
-
- @Override
- public CounterMean<Long> getAndResetMeanDelta() {
- if (kind != MEAN) {
- throw illegalArgumentException();
- }
- return deltaMean.getAndSet(new LongCounterMean(0L, 0L));
- }
-
- @Override
- @Nullable
- public CounterMean<Long> getMean() {
- if (kind != MEAN) {
- throw illegalArgumentException();
- }
- return mean.get();
- }
-
- @Override
- public Counter<Long> merge(Counter<Long> that) {
- try {
- checkArgument(
- this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that);
- switch (kind) {
- case SUM:
- case MIN:
- case MAX:
- return addValue(that.getAggregate());
- case MEAN:
- CounterMean<Long> thisCounterMean = this.getMean();
- CounterMean<Long> thatCounterMean = that.getMean();
- return resetMeanToValue(
- thisCounterMean.getCount() + thatCounterMean.getCount(),
- thisCounterMean.getAggregate() + thatCounterMean.getAggregate());
- default:
- throw illegalArgumentException();
- }
- } finally {
- setDirty();
- }
- }
-
- private static class LongCounterMean implements CounterMean<Long> {
- private final long aggregate;
- private final long count;
-
- public LongCounterMean(long aggregate, long count) {
- this.aggregate = aggregate;
- this.count = count;
- }
-
- @Override
- public Long getAggregate() {
- return aggregate;
- }
-
- @Override
- public long getCount() {
- return count;
- }
-
- @Override
- public String toString() {
- return aggregate + "/" + count;
- }
- }
- }
-
- /**
- * Implements a {@link Counter} for {@link Double} values.
- */
- private static class DoubleCounter extends Counter<Double> {
- AtomicDouble aggregate;
- AtomicDouble deltaAggregate;
- AtomicReference<DoubleCounterMean> mean;
- AtomicReference<DoubleCounterMean> deltaMean;
-
- /** Initializes a new {@link Counter} for {@link Double} values. */
- private DoubleCounter(CounterName name, AggregationKind kind) {
- super(name, kind);
- switch (kind) {
- case MEAN:
- aggregate = deltaAggregate = null;
- mean = new AtomicReference<>();
- deltaMean = new AtomicReference<>();
- getAndResetMeanDelta();
- mean.set(deltaMean.get());
- break;
- case SUM:
- case MAX:
- case MIN:
- mean = deltaMean = null;
- aggregate = new AtomicDouble();
- deltaAggregate = new AtomicDouble();
- getAndResetDelta();
- aggregate.set(deltaAggregate.get());
- break;
- default:
- throw illegalArgumentException();
- }
- }
-
- @Override
- public DoubleCounter addValue(Double value) {
- try {
- switch (kind) {
- case SUM:
- aggregate.addAndGet(value);
- deltaAggregate.addAndGet(value);
- break;
- case MEAN:
- addToMeanAndSet(value, mean);
- addToMeanAndSet(value, deltaMean);
- break;
- case MAX:
- maxAndSet(value, aggregate);
- maxAndSet(value, deltaAggregate);
- break;
- case MIN:
- minAndSet(value, aggregate);
- minAndSet(value, deltaAggregate);
- break;
- default:
- throw illegalArgumentException();
- }
- return this;
- } finally {
- setDirty();
- }
- }
-
- private void addToMeanAndSet(Double value, AtomicReference<DoubleCounterMean> target) {
- DoubleCounterMean current;
- DoubleCounterMean update;
- do {
- current = target.get();
- update = new DoubleCounterMean(current.getAggregate() + value, current.getCount() + 1);
- } while (!target.compareAndSet(current, update));
- }
-
- private void maxAndSet(Double value, AtomicDouble target) {
- double current;
- double update;
- do {
- current = target.get();
- update = Math.max(current, value);
- } while (update > current && !target.compareAndSet(current, update));
- }
-
- private void minAndSet(Double value, AtomicDouble target) {
- double current;
- double update;
- do {
- current = target.get();
- update = Math.min(current, value);
- } while (update < current && !target.compareAndSet(current, update));
- }
-
- @Override
- public Double getAndResetDelta() {
- switch (kind) {
- case SUM:
- return deltaAggregate.getAndSet(0.0);
- case MAX:
- return deltaAggregate.getAndSet(Double.NEGATIVE_INFINITY);
- case MIN:
- return deltaAggregate.getAndSet(Double.POSITIVE_INFINITY);
- default:
- throw illegalArgumentException();
- }
- }
-
- @Override
- public Counter<Double> resetToValue(Double value) {
- try {
- if (kind == MEAN) {
- throw illegalArgumentException();
- }
- aggregate.set(value);
- deltaAggregate.set(value);
- return this;
- } finally {
- setDirty();
- }
- }
-
- @Override
- public Counter<Double> resetMeanToValue(long elementCount, Double value) {
- try {
- if (kind != MEAN) {
- throw illegalArgumentException();
- }
- if (elementCount < 0) {
- throw new IllegalArgumentException("elementCount must be non-negative");
- }
- DoubleCounterMean counterMean = new DoubleCounterMean(value, elementCount);
- mean.set(counterMean);
- deltaMean.set(counterMean);
- return this;
- } finally {
- setDirty();
- }
- }
-
- @Override
- public CounterMean<Double> getAndResetMeanDelta() {
- if (kind != MEAN) {
- throw illegalArgumentException();
- }
- return deltaMean.getAndSet(new DoubleCounterMean(0.0, 0L));
- }
-
- @Override
- public Double getAggregate() {
- if (kind != MEAN) {
- return aggregate.get();
- } else {
- return getMean().getAggregate();
- }
- }
-
- @Override
- @Nullable
- public CounterMean<Double> getMean() {
- if (kind != MEAN) {
- throw illegalArgumentException();
- }
- return mean.get();
- }
-
- @Override
- public Counter<Double> merge(Counter<Double> that) {
- try {
- checkArgument(
- this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that);
- switch (kind) {
- case SUM:
- case MIN:
- case MAX:
- return addValue(that.getAggregate());
- case MEAN:
- CounterMean<Double> thisCounterMean = this.getMean();
- CounterMean<Double> thatCounterMean = that.getMean();
- return resetMeanToValue(
- thisCounterMean.getCount() + thatCounterMean.getCount(),
- thisCounterMean.getAggregate() + thatCounterMean.getAggregate());
- default:
- throw illegalArgumentException();
- }
- } finally {
- setDirty();
- }
- }
-
- private static class DoubleCounterMean implements CounterMean<Double> {
- private final double aggregate;
- private final long count;
-
- public DoubleCounterMean(double aggregate, long count) {
- this.aggregate = aggregate;
- this.count = count;
- }
-
- @Override
- public Double getAggregate() {
- return aggregate;
- }
-
- @Override
- public long getCount() {
- return count;
- }
-
- @Override
- public String toString() {
- return aggregate + "/" + count;
- }
- }
- }
-
- /**
- * Implements a {@link Counter} for {@link Boolean} values.
- */
- private static class BooleanCounter extends Counter<Boolean> {
- private final AtomicBoolean aggregate;
- private final AtomicBoolean deltaAggregate;
-
- /** Initializes a new {@link Counter} for {@link Boolean} values. */
- private BooleanCounter(CounterName name, AggregationKind kind) {
- super(name, kind);
- aggregate = new AtomicBoolean();
- deltaAggregate = new AtomicBoolean();
- getAndResetDelta();
- aggregate.set(deltaAggregate.get());
- }
-
- @Override
- public BooleanCounter addValue(Boolean value) {
- try {
- if (kind.equals(AND) && !value) {
- aggregate.set(value);
- deltaAggregate.set(value);
- } else if (kind.equals(OR) && value) {
- aggregate.set(value);
- deltaAggregate.set(value);
- }
- return this;
- } finally {
- setDirty();
- }
- }
-
- @Override
- public Boolean getAndResetDelta() {
- switch (kind) {
- case AND:
- return deltaAggregate.getAndSet(true);
- case OR:
- return deltaAggregate.getAndSet(false);
- default:
- throw illegalArgumentException();
- }
- }
-
- @Override
- public Counter<Boolean> resetToValue(Boolean value) {
- try {
- aggregate.set(value);
- deltaAggregate.set(value);
- return this;
- } finally {
- setDirty();
- }
- }
-
- @Override
- public Counter<Boolean> resetMeanToValue(long elementCount, Boolean value) {
- throw illegalArgumentException();
- }
-
- @Override
- public CounterMean<Boolean> getAndResetMeanDelta() {
- throw illegalArgumentException();
- }
-
- @Override
- public Boolean getAggregate() {
- return aggregate.get();
- }
-
- @Override
- @Nullable
- public CounterMean<Boolean> getMean() {
- throw illegalArgumentException();
- }
-
- @Override
- public Counter<Boolean> merge(Counter<Boolean> that) {
- try {
- checkArgument(
- this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that);
- return addValue(that.getAggregate());
- } finally {
- setDirty();
- }
- }
- }
-
- /**
- * Implements a {@link Counter} for {@link String} values.
- */
- private static class StringCounter extends Counter<String> {
- /** Initializes a new {@link Counter} for {@link String} values. */
- private StringCounter(CounterName name, AggregationKind kind) {
- super(name, kind);
- // TODO: Support MIN, MAX of Strings.
- throw illegalArgumentException();
- }
-
- @Override
- public StringCounter addValue(String value) {
- switch (kind) {
- default:
- throw illegalArgumentException();
- }
- }
-
- @Override
- public Counter<String> resetToValue(String value) {
- switch (kind) {
- default:
- throw illegalArgumentException();
- }
- }
-
- @Override
- public Counter<String> resetMeanToValue(long elementCount, String value) {
- switch (kind) {
- default:
- throw illegalArgumentException();
- }
- }
-
- @Override
- public String getAndResetDelta() {
- switch (kind) {
- default:
- throw illegalArgumentException();
- }
- }
-
- @Override
- public CounterMean<String> getAndResetMeanDelta() {
- switch (kind) {
- default:
- throw illegalArgumentException();
- }
- }
-
- @Override
- public String getAggregate() {
- switch (kind) {
- default:
- throw illegalArgumentException();
- }
- }
-
- @Override
- @Nullable
- public CounterMean<String> getMean() {
- switch (kind) {
- default:
- throw illegalArgumentException();
- }
- }
-
- @Override
- public Counter<String> merge(Counter<String> that) {
- checkArgument(this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that);
- switch (kind) {
- default:
- throw illegalArgumentException();
- }
- }
- }
-
- /**
- * Implements a {@link Counter} for {@link Integer} values.
- */
- private static class IntegerCounter extends Counter<Integer> {
- private final AtomicInteger aggregate;
- private final AtomicInteger deltaAggregate;
- private final AtomicReference<IntegerCounterMean> mean;
- private final AtomicReference<IntegerCounterMean> deltaMean;
-
- /** Initializes a new {@link Counter} for {@link Integer} values. */
- private IntegerCounter(CounterName name, AggregationKind kind) {
- super(name, kind);
- switch (kind) {
- case MEAN:
- aggregate = deltaAggregate = null;
- mean = new AtomicReference<>();
- deltaMean = new AtomicReference<>();
- getAndResetMeanDelta();
- mean.set(deltaMean.get());
- break;
- case SUM:
- case MAX:
- case MIN:
- mean = deltaMean = null;
- aggregate = new AtomicInteger();
- deltaAggregate = new AtomicInteger();
- getAndResetDelta();
- aggregate.set(deltaAggregate.get());
- break;
- default:
- throw illegalArgumentException();
- }
- }
-
- @Override
- public IntegerCounter addValue(Integer value) {
- try {
- switch (kind) {
- case SUM:
- aggregate.getAndAdd(value);
- deltaAggregate.getAndAdd(value);
- break;
- case MEAN:
- addToMeanAndSet(value, mean);
- addToMeanAndSet(value, deltaMean);
- break;
- case MAX:
- maxAndSet(value, aggregate);
- maxAndSet(value, deltaAggregate);
- break;
- case MIN:
- minAndSet(value, aggregate);
- minAndSet(value, deltaAggregate);
- break;
- default:
- throw illegalArgumentException();
- }
- return this;
- } finally {
- setDirty();
- }
- }
-
- private void addToMeanAndSet(int value, AtomicReference<IntegerCounterMean> target) {
- IntegerCounterMean current;
- IntegerCounterMean update;
- do {
- current = target.get();
- update = new IntegerCounterMean(current.getAggregate() + value, current.getCount() + 1);
- } while (!target.compareAndSet(current, update));
- }
-
- private void maxAndSet(int value, AtomicInteger target) {
- int current;
- int update;
- do {
- current = target.get();
- update = Math.max(value, current);
- } while (update > current && !target.compareAndSet(current, update));
- }
-
- private void minAndSet(int value, AtomicInteger target) {
- int current;
- int update;
- do {
- current = target.get();
- update = Math.min(value, current);
- } while (update < current && !target.compareAndSet(current, update));
- }
-
- @Override
- public Integer getAndResetDelta() {
- switch (kind) {
- case SUM:
- return deltaAggregate.getAndSet(0);
- case MAX:
- return deltaAggregate.getAndSet(Integer.MIN_VALUE);
- case MIN:
- return deltaAggregate.getAndSet(Integer.MAX_VALUE);
- default:
- throw illegalArgumentException();
- }
- }
-
- @Override
- public Counter<Integer> resetToValue(Integer value) {
- try {
- if (kind == MEAN) {
- throw illegalArgumentException();
- }
- aggregate.set(value);
- deltaAggregate.set(value);
- return this;
- } finally {
- setDirty();
- }
- }
-
- @Override
- public Counter<Integer> resetMeanToValue(long elementCount, Integer value) {
- try {
- if (kind != MEAN) {
- throw illegalArgumentException();
- }
- if (elementCount < 0) {
- throw new IllegalArgumentException("elementCount must be non-negative");
- }
- IntegerCounterMean counterMean = new IntegerCounterMean(value, elementCount);
- mean.set(counterMean);
- deltaMean.set(counterMean);
- return this;
- } finally {
- setDirty();
- }
- }
-
- @Override
- public CounterMean<Integer> getAndResetMeanDelta() {
- if (kind != MEAN) {
- throw illegalArgumentException();
- }
- return deltaMean.getAndSet(new IntegerCounterMean(0, 0L));
- }
-
- @Override
- public Integer getAggregate() {
- if (kind != MEAN) {
- return aggregate.get();
- } else {
- return getMean().getAggregate();
- }
- }
-
- @Override
- @Nullable
- public CounterMean<Integer> getMean() {
- if (kind != MEAN) {
- throw illegalArgumentException();
- }
- return mean.get();
- }
-
- @Override
- public Counter<Integer> merge(Counter<Integer> that) {
- try {
- checkArgument(
- this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that);
- switch (kind) {
- case SUM:
- case MIN:
- case MAX:
- return addValue(that.getAggregate());
- case MEAN:
- CounterMean<Integer> thisCounterMean = this.getMean();
- CounterMean<Integer> thatCounterMean = that.getMean();
- return resetMeanToValue(
- thisCounterMean.getCount() + thatCounterMean.getCount(),
- thisCounterMean.getAggregate() + thatCounterMean.getAggregate());
- default:
- throw illegalArgumentException();
- }
- } finally {
- setDirty();
- }
- }
-
- private static class IntegerCounterMean implements CounterMean<Integer> {
- private final int aggregate;
- private final long count;
-
- public IntegerCounterMean(int aggregate, long count) {
- this.aggregate = aggregate;
- this.count = count;
- }
-
- @Override
- public Integer getAggregate() {
- return aggregate;
- }
-
- @Override
- public long getCount() {
- return count;
- }
-
- @Override
- public String toString() {
- return aggregate + "/" + count;
- }
- }
- }
-
- //////////////////////////////////////////////////////////////////////////////
-
- /**
- * Constructs an {@link IllegalArgumentException} explaining that this
- * {@link Counter}'s aggregation kind is not supported by its value type.
- */
- protected IllegalArgumentException illegalArgumentException() {
- return new IllegalArgumentException("Cannot compute " + kind
- + " aggregation over " + getType().getSimpleName() + " values.");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d20a7ada/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterName.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterName.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterName.java
deleted file mode 100644
index b46be98..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterName.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.common;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.base.Strings;
-
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * The name of a counter identifies the user-specified name, as well as the origin,
- * the step the counter is associated with, and a prefix to add to the name.
- *
- * <p>For backwards compatibility, the {@link CounterName} will be converted to
- * a flat name (string) during the migration.
- */
-public class CounterName {
- /**
- * Returns a {@link CounterName} with the given name.
- */
- public static CounterName named(String name) {
- return new CounterName(name, "", "", "");
- }
-
- /**
- * Returns a msecs {@link CounterName}.
- */
- public static CounterName msecs(String name) {
- return named(name + "-msecs");
- }
-
- /**
- * Returns a {@link CounterName} identical to this, but with the given origin.
- */
- public CounterName withOrigin(String origin) {
- return new CounterName(this.name, origin, this.stepName, this.prefix);
- }
-
- /**
- * Returns a {@link CounterName} identical to this, but with the given step name.
- */
- public CounterName withStepName(String stepName) {
- return new CounterName(this.name, this.origin, stepName, this.prefix);
- }
-
- /**
- * Returns a {@link CounterName} identical to this, but with the given prefix.
- */
- public CounterName withPrefix(String prefix) {
- return new CounterName(this.name, this.origin, this.stepName, prefix);
- }
-
- /**
- * Name of the counter.
- *
- * <p>For example, process-msecs, ElementCount.
- */
- private final String name;
-
- /**
- * Origin (namespace) of counter name.
- *
- * <p>For example, "user" for user-defined counters.
- * It is empty for counters defined by the SDK or the runner.
- */
- private final String origin;
-
- /**
- * System defined step name or the named-output of a step.
- *
- * <p>For example, {@code s1} or {@code s2.out}.
- * It may be empty when counters don't associate with step names.
- */
- private final String stepName;
-
- /**
- * Prefix of group of counters.
- *
- * <p>It is empty when counters don't have general prefixes.
- */
- private final String prefix;
-
- /**
- * Flat name is the equivalent unstructured name.
- *
- * <p>It is null before {@link #getFlatName()} is called.
- */
- private AtomicReference<String> flatName;
-
- private CounterName(String name, String origin, String stepName, String prefix) {
- this.name = checkNotNull(name, "name");
- this.origin = checkNotNull(origin, "origin");
- this.stepName = checkNotNull(stepName, "stepName");
- this.prefix = checkNotNull(prefix, "prefix");
- this.flatName = new AtomicReference<String>();
- }
-
- /**
- * Returns the flat name of a structured counter.
- */
- public String getFlatName() {
- String ret = flatName.get();
- if (ret == null) {
- StringBuilder sb = new StringBuilder();
- if (!Strings.isNullOrEmpty(prefix)) {
- // Not all runner versions use "-" to concatenate prefix, it may already have it in it.
- sb.append(prefix);
- }
- if (!Strings.isNullOrEmpty(origin)) {
- sb.append(origin + "-");
- }
- if (!Strings.isNullOrEmpty(stepName)) {
- sb.append(stepName + "-");
- }
- sb.append(name);
- flatName.compareAndSet(null, sb.toString());
- ret = flatName.get();
- }
- return ret;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- } else if (o instanceof CounterName) {
- CounterName that = (CounterName) o;
- return this.getFlatName().equals(that.getFlatName());
- }
- return false;
- }
-
- @Override
- public int hashCode() {
- return getFlatName().hashCode();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d20a7ada/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterProvider.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterProvider.java
deleted file mode 100644
index c2550cd..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterProvider.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.common;
-
-/**
- * A counter provider can provide {@link Counter} instances.
- *
- * @param <T> the input type of the counter.
- */
-public interface CounterProvider<T> {
- Counter<T> getCounter(String name);
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d20a7ada/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterSet.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterSet.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterSet.java
deleted file mode 100644
index cb0ffe5..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterSet.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.common;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import java.util.AbstractSet;
-import java.util.HashMap;
-import java.util.Iterator;
-
-/**
- * A CounterSet maintains a set of {@link Counter}s.
- *
- * <p>Thread-safe.
- */
-public class CounterSet extends AbstractSet<Counter<?>> {
-
- /** Registered counters. */
- private final HashMap<String, Counter<?>> counters = new HashMap<>();
-
- private final AddCounterMutator addCounterMutator = new AddCounterMutator();
-
- /**
- * Constructs a CounterSet containing the given Counters.
- */
- public CounterSet(Counter<?>... counters) {
- for (Counter<?> counter : counters) {
- addNewCounter(counter);
- }
- }
-
- /**
- * Returns an object that supports adding additional counters into
- * this CounterSet.
- */
- public AddCounterMutator getAddCounterMutator() {
- return addCounterMutator;
- }
-
- /**
- * Adds a new counter, throwing an exception if a counter of the
- * same name already exists.
- */
- public void addNewCounter(Counter<?> counter) {
- if (!addCounter(counter)) {
- throw new IllegalArgumentException(
- "Counter " + counter + " duplicates an existing counter in " + this);
- }
- }
-
- /**
- * Adds the given Counter to this CounterSet.
- *
- * <p>If a counter with the same name already exists, it will be
- * reused, as long as it is compatible.
- *
- * @return the Counter that was reused, or added
- * @throws IllegalArgumentException if a counter with the same
- * name but an incompatible kind had already been added
- */
- public synchronized <T> Counter<T> addOrReuseCounter(Counter<T> counter) {
- String flatName = counter.getFlatName();
- Counter<?> oldCounter = counters.get(flatName);
- if (oldCounter == null) {
- // A new counter.
- counters.put(flatName, counter);
- return counter;
- }
- if (counter.isCompatibleWith(oldCounter)) {
- // Return the counter to reuse.
- @SuppressWarnings("unchecked")
- Counter<T> compatibleCounter = (Counter<T>) oldCounter;
- return compatibleCounter;
- }
- throw new IllegalArgumentException(
- "Counter " + counter + " duplicates incompatible counter "
- + oldCounter + " in " + this);
- }
-
- /**
- * Adds a counter. Returns {@code true} if the counter was added to the set
- * and false if the given counter was {@code null} or it already existed in
- * the set.
- *
- * @param counter to register
- */
- public boolean addCounter(Counter<?> counter) {
- return add(counter);
- }
-
- /**
- * Returns the Counter with the given name in this CounterSet;
- * returns null if no such Counter exists.
- */
- public synchronized Counter<?> getExistingCounter(String name) {
- return counters.get(name);
- }
-
- @Override
- public synchronized Iterator<Counter<?>> iterator() {
- return counters.values().iterator();
- }
-
- @Override
- public synchronized int size() {
- return counters.size();
- }
-
- @Override
- public synchronized boolean add(Counter<?> e) {
- if (null == e) {
- return false;
- }
- if (counters.containsKey(e.getFlatName())) {
- return false;
- }
- counters.put(e.getFlatName(), e);
- return true;
- }
-
- public synchronized void merge(CounterSet that) {
- for (Counter<?> theirCounter : that) {
- Counter<?> myCounter = counters.get(theirCounter.getFlatName());
- if (myCounter != null) {
- mergeCounters(myCounter, theirCounter);
- } else {
- addCounter(theirCounter);
- }
- }
- }
-
- private <T> void mergeCounters(Counter<T> mine, Counter<?> theirCounter) {
- checkArgument(
- mine.isCompatibleWith(theirCounter),
- "Can't merge CounterSets containing incompatible counters with the same name: "
- + "%s (existing) and %s (merged)",
- mine,
- theirCounter);
- @SuppressWarnings("unchecked")
- Counter<T> theirs = (Counter<T>) theirCounter;
- mine.merge(theirs);
- }
-
- /**
- * A nested class that supports adding additional counters into the
- * enclosing CounterSet. This is useful as a mutator, hiding other
- * public methods of the CounterSet.
- */
- public class AddCounterMutator {
- /**
- * Adds the given Counter into the enclosing CounterSet.
- *
- * <p>If a counter with the same name already exists, it will be
- * reused, as long as it has the same type.
- *
- * @return the Counter that was reused, or added
- * @throws IllegalArgumentException if a counter with the same
- * name but an incompatible kind had already been added
- */
- public <T> Counter<T> addCounter(Counter<T> counter) {
- return addOrReuseCounter(counter);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d20a7ada/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ElementByteSizeObserver.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ElementByteSizeObserver.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ElementByteSizeObserver.java
index 3e7011b..388355e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ElementByteSizeObserver.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ElementByteSizeObserver.java
@@ -19,37 +19,21 @@ package org.apache.beam.sdk.util.common;
import java.util.Observable;
import java.util.Observer;
-import javax.annotation.Nullable;
/**
- * An observer that gets notified when additional bytes are read
- * and/or used. It adds all bytes into a local counter. When the
- * observer gets advanced via the next() call, it adds the total byte
- * count to the specified counter, and prepares for the next element.
+ * An observer that gets notified when additional bytes are read and/or used.
*/
-public class ElementByteSizeObserver implements Observer {
- @Nullable
- private final Counter<Long> counter;
+public abstract class ElementByteSizeObserver implements Observer {
private boolean isLazy = false;
private long totalSize = 0;
private double scalingFactor = 1.0;
- public ElementByteSizeObserver() {
- this.counter = null;
- }
-
- public ElementByteSizeObserver(Counter<Long> counter) {
- this.counter = counter;
- }
+ public ElementByteSizeObserver() {}
/**
* Called to report element byte size.
*/
- protected void reportElementSize(long elementByteSize) {
- if (counter != null) {
- counter.addValue(elementByteSize);
- }
- }
+ protected abstract void reportElementSize(long elementByteSize);
/**
* Sets byte counting for the current element as lazy. That is, the
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d20a7ada/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CounterAggregatorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CounterAggregatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CounterAggregatorTest.java
deleted file mode 100644
index 3f96cf2..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CounterAggregatorTest.java
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.apache.beam.sdk.util.common.Counter.AggregationKind.MAX;
-import static org.apache.beam.sdk.util.common.Counter.AggregationKind.MIN;
-import static org.apache.beam.sdk.util.common.Counter.AggregationKind.SUM;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.withSettings;
-
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Combine.IterableCombineFn;
-import org.apache.beam.sdk.transforms.Max;
-import org.apache.beam.sdk.transforms.Min;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.util.common.Counter;
-import org.apache.beam.sdk.util.common.CounterProvider;
-import org.apache.beam.sdk.util.common.CounterSet;
-import org.apache.beam.sdk.util.common.CounterSet.AddCounterMutator;
-
-import com.google.common.collect.Iterables;
-
-import org.hamcrest.Matchers;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * Unit tests for the {@link Aggregator} API.
- */
-@RunWith(JUnit4.class)
-public class CounterAggregatorTest {
- @Rule
- public final ExpectedException expectedEx = ExpectedException.none();
-
- private static final String AGGREGATOR_NAME = "aggregator_name";
-
- @SuppressWarnings("rawtypes")
- private <V, AccumT> void testAggregator(List<V> items,
- Combine.CombineFn<V, AccumT, V> combiner,
- Counter expectedCounter) {
- CounterSet counters = new CounterSet();
- Aggregator<V, V> aggregator = new CounterAggregator<>(
- AGGREGATOR_NAME, combiner, counters.getAddCounterMutator());
- for (V item : items) {
- aggregator.addValue(item);
- }
-
- assertEquals(Iterables.getOnlyElement(counters), expectedCounter);
- }
-
- @Test
- public void testGetName() {
- String name = "testAgg";
- CounterAggregator<Long, long[], Long> aggregator = new CounterAggregator<>(
- name, new Sum.SumLongFn(),
- new CounterSet().getAddCounterMutator());
-
- assertEquals(name, aggregator.getName());
- }
-
- @Test
- public void testGetCombineFn() {
- CombineFn<Long, ?, Long> combineFn = new Min.MinLongFn();
-
- CounterAggregator<Long, ?, Long> aggregator = new CounterAggregator<>("foo",
- combineFn, new CounterSet().getAddCounterMutator());
-
- assertEquals(combineFn, aggregator.getCombineFn());
- }
-
- @Test
-
- public void testSumInteger() throws Exception {
- testAggregator(Arrays.asList(2, 4, 1, 3), new Sum.SumIntegerFn(),
- Counter.ints(AGGREGATOR_NAME, SUM).resetToValue(10));
- }
-
- @Test
- public void testSumLong() throws Exception {
- testAggregator(Arrays.asList(2L, 4L, 1L, 3L), new Sum.SumLongFn(),
- Counter.longs(AGGREGATOR_NAME, SUM).resetToValue(10L));
- }
-
- @Test
- public void testSumDouble() throws Exception {
- testAggregator(Arrays.asList(2.0, 4.1, 1.0, 3.1), new Sum.SumDoubleFn(),
- Counter.doubles(AGGREGATOR_NAME, SUM).resetToValue(10.2));
- }
-
- @Test
- public void testMinInteger() throws Exception {
- testAggregator(Arrays.asList(2, 4, 1, 3), new Min.MinIntegerFn(),
- Counter.ints(AGGREGATOR_NAME, MIN).resetToValue(1));
- }
-
- @Test
- public void testMinLong() throws Exception {
- testAggregator(Arrays.asList(2L, 4L, 1L, 3L), new Min.MinLongFn(),
- Counter.longs(AGGREGATOR_NAME, MIN).resetToValue(1L));
- }
-
- @Test
- public void testMinDouble() throws Exception {
- testAggregator(Arrays.asList(2.0, 4.1, 1.0, 3.1), new Min.MinDoubleFn(),
- Counter.doubles(AGGREGATOR_NAME, MIN).resetToValue(1.0));
- }
-
- @Test
- public void testMaxInteger() throws Exception {
- testAggregator(Arrays.asList(2, 4, 1, 3), new Max.MaxIntegerFn(),
- Counter.ints(AGGREGATOR_NAME, MAX).resetToValue(4));
- }
-
- @Test
- public void testMaxLong() throws Exception {
- testAggregator(Arrays.asList(2L, 4L, 1L, 3L), new Max.MaxLongFn(),
- Counter.longs(AGGREGATOR_NAME, MAX).resetToValue(4L));
- }
-
- @Test
- public void testMaxDouble() throws Exception {
- testAggregator(Arrays.asList(2.0, 4.1, 1.0, 3.1), new Max.MaxDoubleFn(),
- Counter.doubles(AGGREGATOR_NAME, MAX).resetToValue(4.1));
- }
-
- @Test
- public void testCounterProviderCallsProvidedCounterAddValue() {
- @SuppressWarnings("unchecked")
- CombineFn<String, ?, String> combiner = mock(CombineFn.class,
- withSettings().extraInterfaces(CounterProvider.class));
- @SuppressWarnings("unchecked")
- CounterProvider<String> provider = (CounterProvider<String>) combiner;
-
- @SuppressWarnings("unchecked")
- Counter<String> mockCounter = mock(Counter.class);
- String name = "foo";
- when(provider.getCounter(name)).thenReturn(mockCounter);
-
- AddCounterMutator addCounterMutator = mock(AddCounterMutator.class);
- when(addCounterMutator.addCounter(mockCounter)).thenReturn(mockCounter);
-
- Aggregator<String, String> aggregator =
- new CounterAggregator<>(name, combiner, addCounterMutator);
-
- aggregator.addValue("bar_baz");
-
- verify(mockCounter).addValue("bar_baz");
- verify(addCounterMutator).addCounter(mockCounter);
- }
-
-
- @Test
- public void testCompatibleDuplicateNames() throws Exception {
- CounterSet counters = new CounterSet();
- Aggregator<Integer, Integer> aggregator1 = new CounterAggregator<>(
- AGGREGATOR_NAME, new Sum.SumIntegerFn(),
- counters.getAddCounterMutator());
-
- Aggregator<Integer, Integer> aggregator2 = new CounterAggregator<>(
- AGGREGATOR_NAME, new Sum.SumIntegerFn(),
- counters.getAddCounterMutator());
-
- // The duplicate aggregators should update the same counter.
- aggregator1.addValue(3);
- aggregator2.addValue(4);
- Assert.assertEquals(
- new CounterSet(Counter.ints(AGGREGATOR_NAME, SUM).resetToValue(7)),
- counters);
- }
-
- @Test
- public void testIncompatibleDuplicateNames() throws Exception {
- CounterSet counters = new CounterSet();
- new CounterAggregator<>(
- AGGREGATOR_NAME, new Sum.SumIntegerFn(),
- counters.getAddCounterMutator());
-
- expectedEx.expect(IllegalArgumentException.class);
- expectedEx.expectMessage(Matchers.containsString(
- "aggregator's name collides with an existing aggregator or "
- + "system-provided counter of an incompatible type"));
- new CounterAggregator<>(
- AGGREGATOR_NAME, new Sum.SumLongFn(),
- counters.getAddCounterMutator());
- }
-
- @Test
- public void testUnsupportedCombineFn() throws Exception {
- expectedEx.expect(IllegalArgumentException.class);
- expectedEx.expectMessage(Matchers.containsString("unsupported combiner"));
- new CounterAggregator<>(
- AGGREGATOR_NAME,
- new Combine.CombineFn<Integer, List<Integer>, Integer>() {
- @Override
- public List<Integer> createAccumulator() {
- return null;
- }
- @Override
- public List<Integer> addInput(List<Integer> accumulator, Integer input) {
- return null;
- }
- @Override
- public List<Integer> mergeAccumulators(Iterable<List<Integer>> accumulators) {
- return null;
- }
- @Override
- public Integer extractOutput(List<Integer> accumulator) {
- return null;
- }
- }, (new CounterSet()).getAddCounterMutator());
- }
-
- @Test
- public void testUnsupportedSerializableFunction() throws Exception {
- expectedEx.expect(IllegalArgumentException.class);
- expectedEx.expectMessage(Matchers.containsString("unsupported combiner"));
- CombineFn<Integer, List<Integer>, Integer> combiner = IterableCombineFn
- .<Integer>of(new SerializableFunction<Iterable<Integer>, Integer>() {
- @Override
- public Integer apply(Iterable<Integer> input) {
- return null;
- }
- });
- new CounterAggregator<>(AGGREGATOR_NAME, combiner,
- (new CounterSet()).getAddCounterMutator());
- }
-}
[3/3] incubator-beam git commit: This closes #815
Posted by bc...@apache.org.
This closes #815
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a0769ad2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a0769ad2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a0769ad2
Branch: refs/heads/master
Commit: a0769ad2a348c1296086b9dc8994e32ba5a06760
Parents: 3a858ee d20a7ad
Author: bchambers <bc...@google.com>
Authored: Thu Aug 11 10:28:04 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Thu Aug 11 11:06:56 2016 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/util/DoFnRunners.java | 78 --
.../beam/runners/dataflow/DataflowRunner.java | 4 +-
.../org/apache/beam/sdk/transforms/Combine.java | 13 -
.../org/apache/beam/sdk/transforms/Max.java | 27 +-
.../org/apache/beam/sdk/transforms/Min.java | 28 +-
.../org/apache/beam/sdk/transforms/Sum.java | 27 +-
.../apache/beam/sdk/util/CounterAggregator.java | 128 --
.../apache/beam/sdk/util/common/Counter.java | 1287 ------------------
.../beam/sdk/util/common/CounterName.java | 153 ---
.../beam/sdk/util/common/CounterProvider.java | 27 -
.../apache/beam/sdk/util/common/CounterSet.java | 179 ---
.../util/common/ElementByteSizeObserver.java | 24 +-
.../beam/sdk/util/CounterAggregatorTest.java | 256 ----
.../beam/sdk/util/common/CounterSetTest.java | 227 ---
.../beam/sdk/util/common/CounterTest.java | 736 ----------
15 files changed, 15 insertions(+), 3179 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0769ad2/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------