You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/06/19 17:23:42 UTC
[1/2] beam git commit: A few cleanups in CombineTest
Repository: beam
Updated Branches:
refs/heads/master c12d6ba80 -> 1fb430442
A few cleanups in CombineTest
Better error messages and IntelliJ warning cleanups.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cf654a0b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cf654a0b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cf654a0b
Branch: refs/heads/master
Commit: cf654a0bcd876310311f48deb64cd49d7df2893c
Parents: c12d6ba
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Jun 16 13:07:48 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Mon Jun 19 10:23:20 2017 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/transforms/CombineTest.java | 125 ++++++++-----------
1 file changed, 53 insertions(+), 72 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/cf654a0b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index c4ba62d..e2469ab 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -17,7 +17,7 @@
*/
package org.apache.beam.sdk.transforms;
-import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static org.apache.beam.sdk.TestUtils.checkCombineFn;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
@@ -45,7 +45,6 @@ import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.DoubleCoder;
import org.apache.beam.sdk.coders.KvCoder;
@@ -85,7 +84,6 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-import org.mockito.Mock;
/**
* Tests for Combine transforms.
@@ -97,8 +95,6 @@ public class CombineTest implements Serializable {
static final List<KV<String, Integer>> EMPTY_TABLE = Collections.emptyList();
- @Mock private DoFn<?, ?>.ProcessContext processContext;
-
@Rule
public final transient TestPipeline pipeline = TestPipeline.create();
@@ -142,12 +138,12 @@ public class CombineTest implements Serializable {
PCollection<KV<String, String>> combinePerKey =
perKeyInput.apply(
Combine.<String, Integer, String>perKey(new TestCombineFnWithContext(globallySumView))
- .withSideInputs(Arrays.asList(globallySumView)));
+ .withSideInputs(globallySumView));
PCollection<String> combineGlobally = globallyInput
.apply(Combine.globally(new TestCombineFnWithContext(globallySumView))
.withoutDefaults()
- .withSideInputs(Arrays.asList(globallySumView)));
+ .withSideInputs(globallySumView));
PAssert.that(sum).containsInAnyOrder(globalSum);
PAssert.that(combinePerKey).containsInAnyOrder(perKeyCombines);
@@ -280,11 +276,9 @@ public class CombineTest implements Serializable {
.apply(Combine.<String, Integer, String>perKey(new TestCombineFn()));
PAssert.that(sum).containsInAnyOrder(2, 5, 13);
- PAssert.that(sumPerKey).containsInAnyOrder(
- KV.of("a", "11"),
- KV.of("a", "4"),
- KV.of("b", "1"),
- KV.of("b", "13"));
+ PAssert.that(sumPerKey)
+ .containsInAnyOrder(
+ Arrays.asList(KV.of("a", "11"), KV.of("a", "4"), KV.of("b", "1"), KV.of("b", "13")));
pipeline.run();
}
@@ -313,19 +307,18 @@ public class CombineTest implements Serializable {
PCollection<KV<String, String>> combinePerKeyWithContext =
perKeyInput.apply(
Combine.<String, Integer, String>perKey(new TestCombineFnWithContext(globallySumView))
- .withSideInputs(Arrays.asList(globallySumView)));
+ .withSideInputs(globallySumView));
PCollection<String> combineGloballyWithContext = globallyInput
.apply(Combine.globally(new TestCombineFnWithContext(globallySumView))
.withoutDefaults()
- .withSideInputs(Arrays.asList(globallySumView)));
+ .withSideInputs(globallySumView));
PAssert.that(sum).containsInAnyOrder(2, 5, 13);
- PAssert.that(combinePerKeyWithContext).containsInAnyOrder(
- KV.of("a", "2:11"),
- KV.of("a", "5:4"),
- KV.of("b", "5:1"),
- KV.of("b", "13:13"));
+ PAssert.that(combinePerKeyWithContext)
+ .containsInAnyOrder(
+ Arrays.asList(
+ KV.of("a", "2:11"), KV.of("a", "5:4"), KV.of("b", "5:1"), KV.of("b", "13:13")));
PAssert.that(combineGloballyWithContext).containsInAnyOrder("2:11", "5:14", "13:13");
pipeline.run();
}
@@ -355,23 +348,25 @@ public class CombineTest implements Serializable {
PCollection<KV<String, String>> combinePerKeyWithContext =
perKeyInput.apply(
Combine.<String, Integer, String>perKey(new TestCombineFnWithContext(globallySumView))
- .withSideInputs(Arrays.asList(globallySumView)));
+ .withSideInputs(globallySumView));
PCollection<String> combineGloballyWithContext = globallyInput
.apply(Combine.globally(new TestCombineFnWithContext(globallySumView))
.withoutDefaults()
- .withSideInputs(Arrays.asList(globallySumView)));
+ .withSideInputs(globallySumView));
PAssert.that(sum).containsInAnyOrder(1, 2, 1, 4, 5, 14, 13);
- PAssert.that(combinePerKeyWithContext).containsInAnyOrder(
- KV.of("a", "1:1"),
- KV.of("a", "2:11"),
- KV.of("a", "1:1"),
- KV.of("a", "4:4"),
- KV.of("a", "5:4"),
- KV.of("b", "5:1"),
- KV.of("b", "14:113"),
- KV.of("b", "13:13"));
+ PAssert.that(combinePerKeyWithContext)
+ .containsInAnyOrder(
+ Arrays.asList(
+ KV.of("a", "1:1"),
+ KV.of("a", "2:11"),
+ KV.of("a", "1:1"),
+ KV.of("a", "4:4"),
+ KV.of("a", "5:4"),
+ KV.of("b", "5:1"),
+ KV.of("b", "14:113"),
+ KV.of("b", "13:13")));
PAssert.that(combineGloballyWithContext).containsInAnyOrder(
"1:1", "2:11", "1:1", "4:4", "5:14", "14:113", "13:13");
pipeline.run();
@@ -433,10 +428,8 @@ public class CombineTest implements Serializable {
.apply(Combine.<String, Integer, String>perKey(new TestCombineFn()));
PAssert.that(sum).containsInAnyOrder(7, 13);
- PAssert.that(sumPerKey).containsInAnyOrder(
- KV.of("a", "114"),
- KV.of("b", "1"),
- KV.of("b", "13"));
+ PAssert.that(sumPerKey)
+ .containsInAnyOrder(Arrays.asList(KV.of("a", "114"), KV.of("b", "1"), KV.of("b", "13")));
pipeline.run();
}
@@ -471,7 +464,7 @@ public class CombineTest implements Serializable {
.apply(
Combine.<String, Integer, String>perKey(
new TestCombineFnWithContext(globallyFixedWindowsView))
- .withSideInputs(Arrays.asList(globallyFixedWindowsView)));
+ .withSideInputs(globallyFixedWindowsView));
PCollection<String> sessionsCombineGlobally =
globallyInput
@@ -481,13 +474,12 @@ public class CombineTest implements Serializable {
.apply(
Combine.globally(new TestCombineFnWithContext(globallyFixedWindowsView))
.withoutDefaults()
- .withSideInputs(Arrays.asList(globallyFixedWindowsView)));
+ .withSideInputs(globallyFixedWindowsView));
PAssert.that(fixedWindowsSum).containsInAnyOrder(2, 4, 1, 13);
- PAssert.that(sessionsCombinePerKey).containsInAnyOrder(
- KV.of("a", "1:114"),
- KV.of("b", "1:1"),
- KV.of("b", "0:13"));
+ PAssert.that(sessionsCombinePerKey)
+ .containsInAnyOrder(
+ Arrays.asList(KV.of("a", "1:114"), KV.of("b", "1:1"), KV.of("b", "0:13")));
PAssert.that(sessionsCombineGlobally).containsInAnyOrder("1:1114", "0:13");
pipeline.run();
}
@@ -716,7 +708,7 @@ public class CombineTest implements Serializable {
pipeline
.apply(
"CreateMainInput",
- Create.<Void>timestamped(nonEmptyElement, emptyElement).withCoder(VoidCoder.of()))
+ Create.timestamped(nonEmptyElement, emptyElement).withCoder(VoidCoder.of()))
.apply("WindowMainInput", Window.<Void>into(windowFn))
.apply(
"OutputSideInput",
@@ -941,15 +933,13 @@ public class CombineTest implements Serializable {
*/
private class CountSumCoder extends AtomicCoder<CountSum> {
@Override
- public void encode(CountSum value, OutputStream outStream)
- throws CoderException, IOException {
+ public void encode(CountSum value, OutputStream outStream) throws IOException {
LONG_CODER.encode(value.count, outStream);
DOUBLE_CODER.encode(value.sum, outStream);
}
@Override
- public CountSum decode(InputStream inStream)
- throws CoderException, IOException {
+ public CountSum decode(InputStream inStream) throws IOException {
long count = LONG_CODER.decode(inStream);
double sum = DOUBLE_CODER.decode(inStream);
return new CountSum(count, sum);
@@ -992,28 +982,15 @@ public class CombineTest implements Serializable {
public static Coder<Accumulator> getCoder() {
return new AtomicCoder<Accumulator>() {
@Override
- public void encode(Accumulator accumulator, OutputStream outStream)
- throws CoderException, IOException {
- encode(accumulator, outStream, Coder.Context.NESTED);
+ public void encode(Accumulator accumulator, OutputStream outStream) throws IOException {
+ StringUtf8Coder.of().encode(accumulator.seed, outStream);
+ StringUtf8Coder.of().encode(accumulator.value, outStream);
}
@Override
- public void encode(Accumulator accumulator, OutputStream outStream, Coder.Context context)
- throws CoderException, IOException {
- StringUtf8Coder.of().encode(accumulator.seed, outStream, context.nested());
- StringUtf8Coder.of().encode(accumulator.value, outStream, context);
- }
-
- @Override
- public Accumulator decode(InputStream inStream) throws CoderException, IOException {
- return decode(inStream, Coder.Context.NESTED);
- }
-
- @Override
- public Accumulator decode(InputStream inStream, Coder.Context context)
- throws CoderException, IOException {
- String seed = StringUtf8Coder.of().decode(inStream, context.nested());
- String value = StringUtf8Coder.of().decode(inStream, context);
+ public Accumulator decode(InputStream inStream) throws IOException {
+ String seed = StringUtf8Coder.of().decode(inStream);
+ String value = StringUtf8Coder.of().decode(inStream);
return new Accumulator(seed, value);
}
};
@@ -1042,18 +1019,22 @@ public class CombineTest implements Serializable {
@Override
public Accumulator mergeAccumulators(Iterable<Accumulator> accumulators) {
- String seed = null;
- String all = "";
+ Accumulator seedAccumulator = null;
+ StringBuilder all = new StringBuilder();
for (Accumulator accumulator : accumulators) {
- if (seed == null) {
- seed = accumulator.seed;
+ if (seedAccumulator == null) {
+ seedAccumulator = accumulator;
} else {
- checkArgument(seed.equals(accumulator.seed), "Different seed values in accumulator");
+ assertEquals(
+ String.format(
+ "Different seed values in accumulator: %s vs. %s", seedAccumulator, accumulator),
+ seedAccumulator.seed,
+ accumulator.seed);
}
- all += accumulator.value;
+ all.append(accumulator.value);
accumulator.value = "cleared in mergeAccumulators";
}
- return new Accumulator(seed, all);
+ return new Accumulator(checkNotNull(seedAccumulator).seed, all.toString());
}
@Override
@@ -1161,7 +1142,7 @@ public class CombineTest implements Serializable {
@Override
public void mergeAccumulator(Counter accumulator) {
checkState(outputs == 0);
- checkArgument(accumulator.outputs == 0);
+ assertEquals(0, accumulator.outputs);
merges += accumulator.merges + 1;
inputs += accumulator.inputs;
[2/2] beam git commit: This closes #3380: A few cleanups in
CombineTest
Posted by jk...@apache.org.
This closes #3380: A few cleanups in CombineTest
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1fb43044
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1fb43044
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1fb43044
Branch: refs/heads/master
Commit: 1fb43044213fa3381c8534da2bc604b72d355805
Parents: c12d6ba cf654a0
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon Jun 19 10:23:25 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Mon Jun 19 10:23:25 2017 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/transforms/CombineTest.java | 125 ++++++++-----------
1 file changed, 53 insertions(+), 72 deletions(-)
----------------------------------------------------------------------