You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/06/02 00:57:02 UTC
[1/3] incubator-beam git commit: Use Structural Value keys instead of
User Values
Repository: incubator-beam
Updated Branches:
refs/heads/master b9a8cbeff -> 1da797d8f
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StructuralKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StructuralKeyTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StructuralKeyTest.java
new file mode 100644
index 0000000..26514f0
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StructuralKeyTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.theInstance;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link StructuralKey}.
+ */
+@RunWith(JUnit4.class)
+public class StructuralKeyTest {
+ @Test
+ public void getKeyEqualToOldKey() {
+ assertThat(StructuralKey.of(1234, VarIntCoder.of()).getKey(), equalTo(1234));
+ assertThat(StructuralKey.of("foobar", StringUtf8Coder.of()).getKey(), equalTo("foobar"));
+ assertArrayEquals(StructuralKey.of(new byte[] {2, 9, -22}, ByteArrayCoder.of()).getKey(),
+ new byte[] {2, 9, -22});
+ }
+
+ @Test
+ public void getKeyNotSameInstance() {
+ byte[] original = new byte[] {1, 4, 9, 127, -22};
+ StructuralKey<byte[]> key = StructuralKey.of(original, ByteArrayCoder.of());
+
+ assertThat(key.getKey(), not(theInstance(original)));
+ }
+
+ @Test
+ public void objectEqualsTrueKeyEquals() {
+ StructuralKey<Integer> original = StructuralKey.of(1234, VarIntCoder.of());
+ assertThat(StructuralKey.of(1234, VarIntCoder.of()), equalTo(original));
+ }
+
+ @Test
+ public void objectsNotEqualEncodingsEqualEquals() {
+ byte[] original = new byte[] {1, 4, 9, 127, -22};
+ StructuralKey<byte[]> key = StructuralKey.of(original, ByteArrayCoder.of());
+
+ StructuralKey<byte[]> otherKey =
+ StructuralKey.of(new byte[] {1, 4, 9, 127, -22}, ByteArrayCoder.of());
+ assertThat(key, equalTo(otherKey));
+ }
+
+ @Test
+ public void notEqualEncodingsEqual() {
+ byte[] original = new byte[] {1, 4, 9, 127, -22};
+ StructuralKey<byte[]> key = StructuralKey.of(original, ByteArrayCoder.of());
+
+ StructuralKey<byte[]> otherKey =
+ StructuralKey.of(new byte[] {9, -128, 22}, ByteArrayCoder.of());
+ assertThat(key, not(equalTo(otherKey)));
+ }
+}
[3/3] incubator-beam git commit: Closes #411
Posted by dh...@apache.org.
Closes #411
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1da797d8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1da797d8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1da797d8
Branch: refs/heads/master
Commit: 1da797d8f5dfae73cd7574c2d388e9ba3317a839
Parents: b9a8cbe e3cc4fa
Author: Dan Halperin <dh...@google.com>
Authored: Wed Jun 1 17:56:38 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Jun 1 17:56:38 2016 -0700
----------------------------------------------------------------------
.../beam/runners/direct/BundleFactory.java | 4 +-
.../direct/ExecutorServiceParallelExecutor.java | 8 +-
.../ImmutabilityCheckingBundleFactory.java | 4 +-
.../direct/InMemoryWatermarkManager.java | 70 ++++++++-------
.../runners/direct/InProcessBundleFactory.java | 25 +++---
.../direct/InProcessEvaluationContext.java | 10 +--
.../direct/InProcessExecutionContext.java | 4 +-
...InProcessGroupByKeyOnlyEvaluatorFactory.java | 6 +-
.../runners/direct/InProcessPipelineRunner.java | 7 +-
.../apache/beam/runners/direct/StepAndKey.java | 8 +-
.../beam/runners/direct/StructuralKey.java | 77 ++++++++++++++++
.../direct/GroupByKeyEvaluatorFactoryTest.java | 27 ++++--
.../ImmutabilityCheckingBundleFactoryTest.java | 13 ++-
.../direct/InMemoryWatermarkManagerTest.java | 95 +++++++++++---------
.../direct/InProcessBundleFactoryTest.java | 54 ++++++-----
.../direct/InProcessEvaluationContextTest.java | 52 ++++++-----
...ocessGroupByKeyOnlyEvaluatorFactoryTest.java | 33 ++++---
.../direct/InProcessPipelineRunnerTest.java | 60 +++++++++++++
.../direct/InProcessTimerInternalsTest.java | 3 +-
.../direct/ParDoInProcessEvaluatorTest.java | 2 +-
.../direct/ParDoMultiEvaluatorFactoryTest.java | 29 +++---
.../direct/ParDoSingleEvaluatorFactoryTest.java | 35 +++++---
.../beam/runners/direct/StructuralKeyTest.java | 81 +++++++++++++++++
23 files changed, 500 insertions(+), 207 deletions(-)
----------------------------------------------------------------------
[2/3] incubator-beam git commit: Use Structural Value keys instead of
User Values
Posted by dh...@apache.org.
Use Structural Value keys instead of User Values
This fixes problems with lookup by basing entirely on structural
equality.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e3cc4fa4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e3cc4fa4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e3cc4fa4
Branch: refs/heads/master
Commit: e3cc4fa4724625f49e3e6690e878a4713615b2e1
Parents: b9a8cbe
Author: Thomas Groh <tg...@google.com>
Authored: Wed Jun 1 14:28:18 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Jun 1 17:56:29 2016 -0700
----------------------------------------------------------------------
.../beam/runners/direct/BundleFactory.java | 4 +-
.../direct/ExecutorServiceParallelExecutor.java | 8 +-
.../ImmutabilityCheckingBundleFactory.java | 4 +-
.../direct/InMemoryWatermarkManager.java | 70 ++++++++-------
.../runners/direct/InProcessBundleFactory.java | 25 +++---
.../direct/InProcessEvaluationContext.java | 10 +--
.../direct/InProcessExecutionContext.java | 4 +-
...InProcessGroupByKeyOnlyEvaluatorFactory.java | 6 +-
.../runners/direct/InProcessPipelineRunner.java | 7 +-
.../apache/beam/runners/direct/StepAndKey.java | 8 +-
.../beam/runners/direct/StructuralKey.java | 77 ++++++++++++++++
.../direct/GroupByKeyEvaluatorFactoryTest.java | 27 ++++--
.../ImmutabilityCheckingBundleFactoryTest.java | 13 ++-
.../direct/InMemoryWatermarkManagerTest.java | 95 +++++++++++---------
.../direct/InProcessBundleFactoryTest.java | 54 ++++++-----
.../direct/InProcessEvaluationContextTest.java | 52 ++++++-----
...ocessGroupByKeyOnlyEvaluatorFactoryTest.java | 33 ++++---
.../direct/InProcessPipelineRunnerTest.java | 60 +++++++++++++
.../direct/InProcessTimerInternalsTest.java | 3 +-
.../direct/ParDoInProcessEvaluatorTest.java | 2 +-
.../direct/ParDoMultiEvaluatorFactoryTest.java | 29 +++---
.../direct/ParDoSingleEvaluatorFactoryTest.java | 35 +++++---
.../beam/runners/direct/StructuralKeyTest.java | 81 +++++++++++++++++
23 files changed, 500 insertions(+), 207 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java
index fea4841..a0511df 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java
@@ -44,6 +44,6 @@ public interface BundleFactory {
* {@link InProcessGroupByKeyOnly} {@link PTransform PTransforms}. Elements added to the bundle
* belong to the {@code output} {@link PCollection}.
*/
- public <T> UncommittedBundle<T> createKeyedBundle(
- CommittedBundle<?> input, Object key, PCollection<T> output);
+ public <K, T> UncommittedBundle<T> createKeyedBundle(
+ CommittedBundle<?> input, StructuralKey<K> key, PCollection<T> output);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 70a8035..a627125 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -396,17 +396,19 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
private boolean fireTimers() throws Exception {
try {
boolean firedTimers = false;
- for (Map.Entry<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> transformTimers :
+ for (Map.Entry<
+ AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> transformTimers :
evaluationContext.extractFiredTimers().entrySet()) {
AppliedPTransform<?, ?, ?> transform = transformTimers.getKey();
- for (Map.Entry<Object, FiredTimers> keyTimers : transformTimers.getValue().entrySet()) {
+ for (Map.Entry<StructuralKey<?>, FiredTimers> keyTimers :
+ transformTimers.getValue().entrySet()) {
for (TimeDomain domain : TimeDomain.values()) {
Collection<TimerData> delivery = keyTimers.getValue().getTimers(domain);
if (delivery.isEmpty()) {
continue;
}
KeyedWorkItem<Object, Object> work =
- KeyedWorkItems.timersWorkItem(keyTimers.getKey(), delivery);
+ KeyedWorkItems.timersWorkItem(keyTimers.getKey().getKey(), delivery);
@SuppressWarnings({"unchecked", "rawtypes"})
CommittedBundle<?> bundle =
evaluationContext
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java
index 3b38211..92a57dd 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java
@@ -72,8 +72,8 @@ class ImmutabilityCheckingBundleFactory implements BundleFactory {
}
@Override
- public <T> UncommittedBundle<T> createKeyedBundle(
- CommittedBundle<?> input, Object key, PCollection<T> output) {
+ public <K, T> UncommittedBundle<T> createKeyedBundle(
+ CommittedBundle<?> input, StructuralKey<K> key, PCollection<T> output) {
return new ImmutabilityEnforcingBundle<>(underlying.createKeyedBundle(input, key, output));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java
index f8cf343..95095fa 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java
@@ -207,7 +207,7 @@ public class InMemoryWatermarkManager {
private static class AppliedPTransformInputWatermark implements Watermark {
private final Collection<? extends Watermark> inputWatermarks;
private final SortedMultiset<WindowedValue<?>> pendingElements;
- private final Map<Object, NavigableSet<TimerData>> objectTimers;
+ private final Map<StructuralKey<?>, NavigableSet<TimerData>> objectTimers;
private AtomicReference<Instant> currentWatermark;
@@ -286,7 +286,7 @@ public class InMemoryWatermarkManager {
// We don't keep references to timers that have been fired and delivered via #getFiredTimers()
}
- private synchronized Map<Object, List<TimerData>> extractFiredEventTimeTimers() {
+ private synchronized Map<StructuralKey<?>, List<TimerData>> extractFiredEventTimeTimers() {
return extractFiredTimers(currentWatermark.get(), objectTimers);
}
@@ -384,8 +384,8 @@ public class InMemoryWatermarkManager {
private static class SynchronizedProcessingTimeInputWatermark implements Watermark {
private final Collection<? extends Watermark> inputWms;
private final Collection<CommittedBundle<?>> pendingBundles;
- private final Map<Object, NavigableSet<TimerData>> processingTimers;
- private final Map<Object, NavigableSet<TimerData>> synchronizedProcessingTimers;
+ private final Map<StructuralKey<?>, NavigableSet<TimerData>> processingTimers;
+ private final Map<StructuralKey<?>, NavigableSet<TimerData>> synchronizedProcessingTimers;
private final PriorityQueue<TimerData> pendingTimers;
@@ -490,9 +490,9 @@ public class InMemoryWatermarkManager {
}
}
- private synchronized Map<Object, List<TimerData>> extractFiredDomainTimers(
+ private synchronized Map<StructuralKey<?>, List<TimerData>> extractFiredDomainTimers(
TimeDomain domain, Instant firingTime) {
- Map<Object, List<TimerData>> firedTimers;
+ Map<StructuralKey<?>, List<TimerData>> firedTimers;
switch (domain) {
case PROCESSING_TIME:
firedTimers = extractFiredTimers(firingTime, processingTimers);
@@ -509,13 +509,14 @@ public class InMemoryWatermarkManager {
+ " and gave a non-processing time domain "
+ domain);
}
- for (Map.Entry<Object, ? extends Collection<TimerData>> firedTimer : firedTimers.entrySet()) {
+ for (Map.Entry<StructuralKey<?>, ? extends Collection<TimerData>> firedTimer :
+ firedTimers.entrySet()) {
pendingTimers.addAll(firedTimer.getValue());
}
return firedTimers;
}
- private Map<TimeDomain, NavigableSet<TimerData>> timerMap(Object key) {
+ private Map<TimeDomain, NavigableSet<TimerData>> timerMap(StructuralKey<?> key) {
NavigableSet<TimerData> processingQueue = processingTimers.get(key);
if (processingQueue == null) {
processingQueue = new TreeSet<>();
@@ -647,11 +648,12 @@ public class InMemoryWatermarkManager {
*
* The result collection retains ordering of timers (from earliest to latest).
*/
- private static Map<Object, List<TimerData>> extractFiredTimers(
- Instant latestTime, Map<Object, NavigableSet<TimerData>> objectTimers) {
- Map<Object, List<TimerData>> result = new HashMap<>();
- Set<Object> emptyKeys = new HashSet<>();
- for (Map.Entry<Object, NavigableSet<TimerData>> pendingTimers : objectTimers.entrySet()) {
+ private static Map<StructuralKey<?>, List<TimerData>> extractFiredTimers(
+ Instant latestTime, Map<StructuralKey<?>, NavigableSet<TimerData>> objectTimers) {
+ Map<StructuralKey<?>, List<TimerData>> result = new HashMap<>();
+ Set<StructuralKey<?>> emptyKeys = new HashSet<>();
+ for (Map.Entry<StructuralKey<?>, NavigableSet<TimerData>> pendingTimers :
+ objectTimers.entrySet()) {
NavigableSet<TimerData> timers = pendingTimers.getValue();
if (!timers.isEmpty() && timers.first().getTimestamp().isBefore(latestTime)) {
ArrayList<TimerData> keyFiredTimers = new ArrayList<>();
@@ -923,11 +925,12 @@ public class InMemoryWatermarkManager {
* Returns a map of each {@link PTransform} that has pending timers to those timers. All of the
* pending timers will be removed from this {@link InMemoryWatermarkManager}.
*/
- public Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> extractFiredTimers() {
- Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> allTimers = new HashMap<>();
+ public Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> extractFiredTimers() {
+ Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> allTimers = new HashMap<>();
for (Map.Entry<AppliedPTransform<?, ?, ?>, TransformWatermarks> watermarksEntry :
transformToWatermarks.entrySet()) {
- Map<Object, FiredTimers> keyFiredTimers = watermarksEntry.getValue().extractFiredTimers();
+ Map<StructuralKey<?>, FiredTimers> keyFiredTimers =
+ watermarksEntry.getValue().extractFiredTimers();
if (!keyFiredTimers.isEmpty()) {
allTimers.put(watermarksEntry.getKey(), keyFiredTimers);
}
@@ -1130,10 +1133,11 @@ public class InMemoryWatermarkManager {
return FluentIterable.from(bundle.getElements()).transformAndConcat(EXPLODE_WINDOWS_FN);
}
- private Map<Object, FiredTimers> extractFiredTimers() {
- Map<Object, List<TimerData>> eventTimeTimers = inputWatermark.extractFiredEventTimeTimers();
- Map<Object, List<TimerData>> processingTimers;
- Map<Object, List<TimerData>> synchronizedTimers;
+ private Map<StructuralKey<?>, FiredTimers> extractFiredTimers() {
+ Map<StructuralKey<?>, List<TimerData>> eventTimeTimers =
+ inputWatermark.extractFiredEventTimeTimers();
+ Map<StructuralKey<?>, List<TimerData>> processingTimers;
+ Map<StructuralKey<?>, List<TimerData>> synchronizedTimers;
if (inputWatermark.get().equals(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
processingTimers = synchronizedProcessingInputWatermark.extractFiredDomainTimers(
TimeDomain.PROCESSING_TIME, BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -1145,11 +1149,11 @@ public class InMemoryWatermarkManager {
synchronizedTimers = synchronizedProcessingInputWatermark.extractFiredDomainTimers(
TimeDomain.SYNCHRONIZED_PROCESSING_TIME, getSynchronizedProcessingInputTime());
}
- Map<Object, Map<TimeDomain, List<TimerData>>> groupedTimers = new HashMap<>();
+ Map<StructuralKey<?>, Map<TimeDomain, List<TimerData>>> groupedTimers = new HashMap<>();
groupFiredTimers(groupedTimers, eventTimeTimers, processingTimers, synchronizedTimers);
- Map<Object, FiredTimers> keyFiredTimers = new HashMap<>();
- for (Map.Entry<Object, Map<TimeDomain, List<TimerData>>> firedTimers :
+ Map<StructuralKey<?>, FiredTimers> keyFiredTimers = new HashMap<>();
+ for (Map.Entry<StructuralKey<?>, Map<TimeDomain, List<TimerData>>> firedTimers :
groupedTimers.entrySet()) {
keyFiredTimers.put(firedTimers.getKey(), new FiredTimers(firedTimers.getValue()));
}
@@ -1158,10 +1162,10 @@ public class InMemoryWatermarkManager {
@SafeVarargs
private final void groupFiredTimers(
- Map<Object, Map<TimeDomain, List<TimerData>>> groupedToMutate,
- Map<Object, List<TimerData>>... timersToGroup) {
- for (Map<Object, List<TimerData>> subGroup : timersToGroup) {
- for (Map.Entry<Object, List<TimerData>> newTimers : subGroup.entrySet()) {
+ Map<StructuralKey<?>, Map<TimeDomain, List<TimerData>>> groupedToMutate,
+ Map<StructuralKey<?>, List<TimerData>>... timersToGroup) {
+ for (Map<StructuralKey<?>, List<TimerData>> subGroup : timersToGroup) {
+ for (Map.Entry<StructuralKey<?>, List<TimerData>> newTimers : subGroup.entrySet()) {
Map<TimeDomain, List<TimerData>> grouped = groupedToMutate.get(newTimers.getKey());
if (grouped == null) {
grouped = new HashMap<>();
@@ -1196,7 +1200,7 @@ public class InMemoryWatermarkManager {
* the input to the executed step.
*/
public static class TimerUpdate {
- private final Object key;
+ private final StructuralKey<?> key;
private final Iterable<? extends TimerData> completedTimers;
private final Iterable<? extends TimerData> setTimers;
@@ -1217,7 +1221,7 @@ public class InMemoryWatermarkManager {
* Creates a new {@link TimerUpdate} builder with the provided completed timers that needs the
* set and deleted timers to be added to it.
*/
- public static TimerUpdateBuilder builder(Object key) {
+ public static TimerUpdateBuilder builder(StructuralKey<?> key) {
return new TimerUpdateBuilder(key);
}
@@ -1225,12 +1229,12 @@ public class InMemoryWatermarkManager {
* A {@link TimerUpdate} builder that needs to be provided with set timers and deleted timers.
*/
public static final class TimerUpdateBuilder {
- private final Object key;
+ private final StructuralKey<?> key;
private final Collection<TimerData> completedTimers;
private final Collection<TimerData> setTimers;
private final Collection<TimerData> deletedTimers;
- private TimerUpdateBuilder(Object key) {
+ private TimerUpdateBuilder(StructuralKey<?> key) {
this.key = key;
this.completedTimers = new HashSet<>();
this.setTimers = new HashSet<>();
@@ -1280,7 +1284,7 @@ public class InMemoryWatermarkManager {
}
private TimerUpdate(
- Object key,
+ StructuralKey<?> key,
Iterable<? extends TimerData> completedTimers,
Iterable<? extends TimerData> setTimers,
Iterable<? extends TimerData> deletedTimers) {
@@ -1291,7 +1295,7 @@ public class InMemoryWatermarkManager {
}
@VisibleForTesting
- Object getKey() {
+ StructuralKey<?> getKey() {
return key;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleFactory.java
index bc9b04c..52bc575 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleFactory.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkState;
import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
@@ -29,8 +30,6 @@ import com.google.common.collect.ImmutableList;
import org.joda.time.Instant;
-import javax.annotation.Nullable;
-
/**
* A factory that produces bundles that perform no additional validation.
*/
@@ -43,7 +42,7 @@ class InProcessBundleFactory implements BundleFactory {
@Override
public <T> UncommittedBundle<T> createRootBundle(PCollection<T> output) {
- return InProcessBundle.create(output, null);
+ return InProcessBundle.create(output, StructuralKey.of(null, VoidCoder.of()));
}
@Override
@@ -52,8 +51,8 @@ class InProcessBundleFactory implements BundleFactory {
}
@Override
- public <T> UncommittedBundle<T> createKeyedBundle(
- CommittedBundle<?> input, @Nullable Object key, PCollection<T> output) {
+ public <K, T> UncommittedBundle<T> createKeyedBundle(
+ CommittedBundle<?> input, StructuralKey<K> key, PCollection<T> output) {
return InProcessBundle.create(output, key);
}
@@ -62,18 +61,18 @@ class InProcessBundleFactory implements BundleFactory {
*/
private static final class InProcessBundle<T> implements UncommittedBundle<T> {
private final PCollection<T> pcollection;
- @Nullable private final Object key;
+ private final StructuralKey<?> key;
private boolean committed = false;
private ImmutableList.Builder<WindowedValue<T>> elements;
/**
* Create a new {@link InProcessBundle} for the specified {@link PCollection}.
*/
- public static <T> InProcessBundle<T> create(PCollection<T> pcollection, @Nullable Object key) {
- return new InProcessBundle<T>(pcollection, key);
+ public static <T> InProcessBundle<T> create(PCollection<T> pcollection, StructuralKey<?> key) {
+ return new InProcessBundle<>(pcollection, key);
}
- private InProcessBundle(PCollection<T> pcollection, Object key) {
+ private InProcessBundle(PCollection<T> pcollection, StructuralKey<?> key) {
this.pcollection = pcollection;
this.key = key;
this.elements = ImmutableList.builder();
@@ -108,7 +107,7 @@ class InProcessBundleFactory implements BundleFactory {
private static class CommittedInProcessBundle<T> implements CommittedBundle<T> {
public CommittedInProcessBundle(
PCollection<T> pcollection,
- Object key,
+ StructuralKey<?> key,
Iterable<WindowedValue<T>> committedElements,
Instant synchronizedCompletionTime) {
this.pcollection = pcollection;
@@ -118,13 +117,13 @@ class InProcessBundleFactory implements BundleFactory {
}
private final PCollection<T> pcollection;
- private final Object key;
+ /** The structural value key of the Bundle, as specified by the coder that created it. */
+ private final StructuralKey<?> key;
private final Iterable<WindowedValue<T>> committedElements;
private final Instant synchronizedCompletionTime;
@Override
- @Nullable
- public Object getKey() {
+ public StructuralKey<?> getKey() {
return key;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
index 732a279..981a842 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
@@ -233,8 +233,8 @@ class InProcessEvaluationContext {
* Create a {@link UncommittedBundle} with the specified keys at the specified step. For use by
* {@link InProcessGroupByKeyOnly} {@link PTransform PTransforms}.
*/
- public <T> UncommittedBundle<T> createKeyedBundle(
- CommittedBundle<?> input, Object key, PCollection<T> output) {
+ public <K, T> UncommittedBundle<T> createKeyedBundle(
+ CommittedBundle<?> input, StructuralKey<K> key, PCollection<T> output) {
return bundleFactory.createKeyedBundle(input, key, output);
}
@@ -302,7 +302,7 @@ class InProcessEvaluationContext {
* Get an {@link ExecutionContext} for the provided {@link AppliedPTransform} and key.
*/
public InProcessExecutionContext getExecutionContext(
- AppliedPTransform<?, ?, ?> application, Object key) {
+ AppliedPTransform<?, ?, ?> application, StructuralKey<?> key) {
StepAndKey stepAndKey = StepAndKey.of(application, key);
return new InProcessExecutionContext(
options.getClock(),
@@ -372,9 +372,9 @@ class InProcessEvaluationContext {
* <p>This is a destructive operation. Timers will only appear in the result of this method once
* for each time they are set.
*/
- public Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> extractFiredTimers() {
+ public Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> extractFiredTimers() {
forceRefresh();
- Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> fired =
+ Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> fired =
watermarkManager.extractFiredTimers();
return fired;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutionContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutionContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutionContext.java
index 44d8bd9..4f10b3a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutionContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutionContext.java
@@ -33,11 +33,11 @@ import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
class InProcessExecutionContext
extends BaseExecutionContext<InProcessExecutionContext.InProcessStepContext> {
private final Clock clock;
- private final Object key;
+ private final StructuralKey<?> key;
private final CopyOnAccessInMemoryStateInternals<Object> existingState;
private final TransformWatermarks watermarks;
- public InProcessExecutionContext(Clock clock, Object key,
+ public InProcessExecutionContext(Clock clock, StructuralKey<?> key,
CopyOnAccessInMemoryStateInternals<Object> existingState, TransformWatermarks watermarks) {
this.clock = clock;
this.key = key;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactory.java
index 79db5b6..a10d496 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactory.java
@@ -147,8 +147,10 @@ class InProcessGroupByKeyOnlyEvaluatorFactory implements TransformEvaluatorFacto
K key = groupedEntry.getKey().key;
KeyedWorkItem<K, V> groupedKv =
KeyedWorkItems.elementsWorkItem(key, groupedEntry.getValue());
- UncommittedBundle<KeyedWorkItem<K, V>> bundle =
- evaluationContext.createKeyedBundle(inputBundle, key, application.getOutput());
+ UncommittedBundle<KeyedWorkItem<K, V>> bundle = evaluationContext.createKeyedBundle(
+ inputBundle,
+ StructuralKey.of(key, keyCoder),
+ application.getOutput());
bundle.add(WindowedValue.valueInGlobalWindow(groupedKv));
resultBuilder.addOutput(bundle);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java
index 5a04af4..8847c58 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java
@@ -60,8 +60,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
-import javax.annotation.Nullable;
-
/**
* An In-Memory implementation of the Dataflow Programming Model. Supports Unbounded
* {@link PCollection PCollections}.
@@ -130,11 +128,10 @@ public class InProcessPipelineRunner
PCollection<T> getPCollection();
/**
- * Returns the (possibly null) key that was output in the most recent {@link GroupByKey} in the
+ * Returns the key that was output in the most recent {@link GroupByKey} in the
* execution of this bundle.
*/
- @Nullable
- Object getKey();
+ StructuralKey<?> getKey();
/**
* Returns an {@link Iterable} containing all of the elements that have been added to this
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepAndKey.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepAndKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepAndKey.java
index 1c7cf6c..18fe04f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepAndKey.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepAndKey.java
@@ -29,16 +29,16 @@ import java.util.Objects;
*/
final class StepAndKey {
private final AppliedPTransform<?, ?, ?> step;
- private final Object key;
+ private final StructuralKey<?> key;
/**
* Create a new {@link StepAndKey} with the provided step and key.
*/
- public static StepAndKey of(AppliedPTransform<?, ?, ?> step, Object key) {
+ public static StepAndKey of(AppliedPTransform<?, ?, ?> step, StructuralKey<?> key) {
return new StepAndKey(step, key);
}
- private StepAndKey(AppliedPTransform<?, ?, ?> step, Object key) {
+ private StepAndKey(AppliedPTransform<?, ?, ?> step, StructuralKey<?> key) {
this.step = step;
this.key = key;
}
@@ -47,7 +47,7 @@ final class StepAndKey {
public String toString() {
return MoreObjects.toStringHelper(StepAndKey.class)
.add("step", step.getFullName())
- .add("key", key)
+ .add("key", key.getKey())
.toString();
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StructuralKey.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StructuralKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StructuralKey.java
new file mode 100644
index 0000000..249ccfe
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StructuralKey.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.util.CoderUtils;
+
+/**
+ * A (Key, Coder) pair that uses the structural value of the key (as provided by
+ * {@link Coder#structuralValue(Object)}) to perform equality and hashing.
+ */
+class StructuralKey<K> {
+ /**
+ * Create a new Structural Key of the provided key that can be encoded by the provided coder.
+ */
+ public static <K> StructuralKey<K> of(K key, Coder<K> coder) {
+ try {
+ return new StructuralKey<>(coder, key);
+ } catch (Exception e) {
+ throw new IllegalArgumentException(
+ "Could not encode a key with its provided coder " + coder.getClass().getSimpleName(), e);
+ }
+ }
+
+ private final Coder<K> coder;
+ private final Object structuralValue;
+ private final byte[] encoded;
+
+ private StructuralKey(Coder<K> coder, K key) throws Exception {
+ this.coder = coder;
+ this.structuralValue = coder.structuralValue(key);
+ this.encoded = CoderUtils.encodeToByteArray(coder, key);
+ }
+
+ public K getKey() {
+ try {
+ return CoderUtils.decodeFromByteArray(coder, encoded);
+ } catch (CoderException e) {
+ throw new IllegalArgumentException(
+ "Could not decode Key with coder of type " + coder.getClass().getSimpleName());
+ }
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == this) {
+ return true;
+ }
+ if (other instanceof StructuralKey) {
+ StructuralKey that = (StructuralKey) other;
+ return structuralValue.equals(that.structuralValue);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return structuralValue.hashCode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
index 92f845c..a4f900c 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
@@ -26,6 +26,7 @@ import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.ReifyTimestampsAndWindows;
@@ -72,17 +73,27 @@ public class GroupByKeyEvaluatorFactoryTest {
CommittedBundle<KV<String, WindowedValue<Integer>>> inputBundle =
bundleFactory.createRootBundle(kvs).commit(Instant.now());
InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class);
-
+ StructuralKey<String> fooKey = StructuralKey.of("foo", StringUtf8Coder.of());
UncommittedBundle<KeyedWorkItem<String, Integer>> fooBundle =
- bundleFactory.createKeyedBundle(null, "foo", groupedKvs);
+ bundleFactory.createKeyedBundle(null, fooKey, groupedKvs);
+
+ StructuralKey<String> barKey = StructuralKey.of("bar", StringUtf8Coder.of());
UncommittedBundle<KeyedWorkItem<String, Integer>> barBundle =
- bundleFactory.createKeyedBundle(null, "bar", groupedKvs);
- UncommittedBundle<KeyedWorkItem<String, Integer>> bazBundle =
- bundleFactory.createKeyedBundle(null, "baz", groupedKvs);
+ bundleFactory.createKeyedBundle(null, barKey, groupedKvs);
- when(evaluationContext.createKeyedBundle(inputBundle, "foo", groupedKvs)).thenReturn(fooBundle);
- when(evaluationContext.createKeyedBundle(inputBundle, "bar", groupedKvs)).thenReturn(barBundle);
- when(evaluationContext.createKeyedBundle(inputBundle, "baz", groupedKvs)).thenReturn(bazBundle);
+ StructuralKey<String> bazKey = StructuralKey.of("baz", StringUtf8Coder.of());
+ UncommittedBundle<KeyedWorkItem<String, Integer>> bazBundle =
+ bundleFactory.createKeyedBundle(null, bazKey, groupedKvs);
+
+ when(evaluationContext.createKeyedBundle(inputBundle,
+ fooKey,
+ groupedKvs)).thenReturn(fooBundle);
+ when(evaluationContext.createKeyedBundle(inputBundle,
+ barKey,
+ groupedKvs)).thenReturn(barBundle);
+ when(evaluationContext.createKeyedBundle(inputBundle,
+ bazKey,
+ groupedKvs)).thenReturn(bazBundle);
// The input to a GroupByKey is assumed to be a KvCoder
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
index 557ebff..2e7847d 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertThat;
import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
@@ -75,7 +76,9 @@ public class ImmutabilityCheckingBundleFactoryTest {
@Test
public void noMutationKeyedBundleSucceeds() {
CommittedBundle<byte[]> root = factory.createRootBundle(created).commit(Instant.now());
- UncommittedBundle<byte[]> keyed = factory.createKeyedBundle(root, "mykey", transformed);
+ UncommittedBundle<byte[]> keyed = factory.createKeyedBundle(root,
+ StructuralKey.of("mykey", StringUtf8Coder.of()),
+ transformed);
WindowedValue<byte[]> windowedArray =
WindowedValue.of(
@@ -121,7 +124,9 @@ public class ImmutabilityCheckingBundleFactoryTest {
@Test
public void mutationBeforeAddKeyedBundleSucceeds() {
CommittedBundle<byte[]> root = factory.createRootBundle(created).commit(Instant.now());
- UncommittedBundle<byte[]> keyed = factory.createKeyedBundle(root, "mykey", transformed);
+ UncommittedBundle<byte[]> keyed = factory.createKeyedBundle(root,
+ StructuralKey.of("mykey", StringUtf8Coder.of()),
+ transformed);
byte[] array = new byte[] {4, 8, 12};
array[0] = Byte.MAX_VALUE;
@@ -172,7 +177,9 @@ public class ImmutabilityCheckingBundleFactoryTest {
@Test
public void mutationAfterAddKeyedBundleThrows() {
CommittedBundle<byte[]> root = factory.createRootBundle(created).commit(Instant.now());
- UncommittedBundle<byte[]> keyed = factory.createKeyedBundle(root, "mykey", transformed);
+ UncommittedBundle<byte[]> keyed = factory.createKeyedBundle(root,
+ StructuralKey.of("mykey", StringUtf8Coder.of()),
+ transformed);
byte[] array = new byte[] {4, 8, 12};
WindowedValue<byte[]> windowedArray =
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java
index 7f202fb..af08d02 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java
@@ -31,6 +31,9 @@ import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate.Timer
import org.apache.beam.runners.direct.InMemoryWatermarkManager.TransformWatermarks;
import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Create;
@@ -397,16 +400,18 @@ public class InMemoryWatermarkManagerTest implements Serializable {
*/
@Test
public void updateWatermarkWithKeyedWatermarkHolds() {
- CommittedBundle<Integer> firstKeyBundle =
- bundleFactory.createKeyedBundle(null, "Odd", createdInts)
- .add(WindowedValue.timestampedValueInGlobalWindow(1, new Instant(1_000_000L)))
- .add(WindowedValue.timestampedValueInGlobalWindow(3, new Instant(-1000L)))
- .commit(clock.now());
+ CommittedBundle<Integer> firstKeyBundle = bundleFactory.createKeyedBundle(null,
+ StructuralKey.of("Odd", StringUtf8Coder.of()),
+ createdInts)
+ .add(WindowedValue.timestampedValueInGlobalWindow(1, new Instant(1_000_000L)))
+ .add(WindowedValue.timestampedValueInGlobalWindow(3, new Instant(-1000L)))
+ .commit(clock.now());
- CommittedBundle<Integer> secondKeyBundle =
- bundleFactory.createKeyedBundle(null, "Even", createdInts)
- .add(WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1234L)))
- .commit(clock.now());
+ CommittedBundle<Integer> secondKeyBundle = bundleFactory.createKeyedBundle(null,
+ StructuralKey.of("Even", StringUtf8Coder.of()),
+ createdInts)
+ .add(WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1234L)))
+ .commit(clock.now());
manager.updateWatermarks(null,
TimerUpdate.empty(),
@@ -435,8 +440,9 @@ public class InMemoryWatermarkManagerTest implements Serializable {
not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
assertThat(filteredWatermarks.getOutputWatermark(), not(laterThan(new Instant(-1000L))));
- CommittedBundle<Integer> fauxFirstKeyTimerBundle =
- bundleFactory.createKeyedBundle(null, "Odd", createdInts).commit(clock.now());
+ CommittedBundle<Integer> fauxFirstKeyTimerBundle = bundleFactory.createKeyedBundle(null,
+ StructuralKey.of("Odd", StringUtf8Coder.of()),
+ createdInts).commit(clock.now());
manager.updateWatermarks(fauxFirstKeyTimerBundle,
TimerUpdate.empty(),
result(filtered.getProducingTransformInternal(),
@@ -447,8 +453,9 @@ public class InMemoryWatermarkManagerTest implements Serializable {
assertThat(filteredWatermarks.getOutputWatermark(), equalTo(new Instant(1234L)));
- CommittedBundle<Integer> fauxSecondKeyTimerBundle =
- bundleFactory.createKeyedBundle(null, "Even", createdInts).commit(clock.now());
+ CommittedBundle<Integer> fauxSecondKeyTimerBundle = bundleFactory.createKeyedBundle(null,
+ StructuralKey.of("Even", StringUtf8Coder.of()),
+ createdInts).commit(clock.now());
manager.updateWatermarks(fauxSecondKeyTimerBundle,
TimerUpdate.empty(),
result(filtered.getProducingTransformInternal(),
@@ -846,13 +853,14 @@ public class InMemoryWatermarkManagerTest implements Serializable {
Instant initialFilteredWm = filteredWms.getSynchronizedProcessingOutputTime();
Instant initialFilteredDoubledWm = filteredDoubledWms.getSynchronizedProcessingOutputTime();
+ StructuralKey<String> key = StructuralKey.of("key", StringUtf8Coder.of());
CommittedBundle<Integer> filteredBundle = multiWindowedBundle(filtered, 2, 8);
TimerData pastTimer =
TimerData.of(StateNamespaces.global(), new Instant(250L), TimeDomain.PROCESSING_TIME);
TimerData futureTimer =
TimerData.of(StateNamespaces.global(), new Instant(4096L), TimeDomain.PROCESSING_TIME);
TimerUpdate timers =
- TimerUpdate.builder("key").setTimer(pastTimer).setTimer(futureTimer).build();
+ TimerUpdate.builder(key).setTimer(pastTimer).setTimer(futureTimer).build();
manager.updateWatermarks(createdBundle,
timers,
result(filtered.getProducingTransformInternal(),
@@ -872,11 +880,11 @@ public class InMemoryWatermarkManagerTest implements Serializable {
filteredDoubledWms.getSynchronizedProcessingOutputTime(),
not(earlierThan(initialFilteredDoubledWm)));
- Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> firedTimers =
+ Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> firedTimers =
manager.extractFiredTimers();
assertThat(
firedTimers.get(filtered.getProducingTransformInternal())
- .get("key")
+ .get(key)
.getTimers(TimeDomain.PROCESSING_TIME),
contains(pastTimer));
// Our timer has fired, but has not been completed, so it holds our synchronized processing WM
@@ -885,14 +893,14 @@ public class InMemoryWatermarkManagerTest implements Serializable {
CommittedBundle<Integer> filteredTimerBundle =
bundleFactory
- .createKeyedBundle(null, "key", filtered)
+ .createKeyedBundle(null, key, filtered)
.commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
CommittedBundle<Integer> filteredTimerResult =
- bundleFactory.createKeyedBundle(null, "key", filteredTimesTwo)
+ bundleFactory.createKeyedBundle(null, key, filteredTimesTwo)
.commit(filteredWms.getSynchronizedProcessingOutputTime());
// Complete the processing time timer
manager.updateWatermarks(filteredTimerBundle,
- TimerUpdate.builder("key")
+ TimerUpdate.builder(key)
.withCompletedTimers(Collections.<TimerData>singleton(pastTimer)).build(),
result(filtered.getProducingTransformInternal(),
filteredTimerBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
@@ -988,7 +996,9 @@ public class InMemoryWatermarkManagerTest implements Serializable {
TimerData upstreamProcessingTimer =
TimerData.of(StateNamespaces.global(), upstreamHold, TimeDomain.PROCESSING_TIME);
manager.updateWatermarks(created,
- TimerUpdate.builder("key").setTimer(upstreamProcessingTimer).build(),
+ TimerUpdate.builder(StructuralKey.of("key", StringUtf8Coder.of()))
+ .setTimer(upstreamProcessingTimer)
+ .build(),
result(filtered.getProducingTransformInternal(),
created.withElements(Collections.<WindowedValue<Integer>>emptyList()),
Collections.<CommittedBundle<?>>singleton(filteredBundle)),
@@ -1009,7 +1019,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
CommittedBundle<Integer> otherCreated = multiWindowedBundle(createdInts, 4, 8, 12);
manager.updateWatermarks(otherCreated,
- TimerUpdate.builder("key")
+ TimerUpdate.builder(StructuralKey.of("key", StringUtf8Coder.of()))
.withCompletedTimers(Collections.singleton(upstreamProcessingTimer)).build(),
result(filtered.getProducingTransformInternal(),
otherCreated.withElements(Collections.<WindowedValue<Integer>>emptyList()),
@@ -1032,8 +1042,9 @@ public class InMemoryWatermarkManagerTest implements Serializable {
new Instant(29_919_235L));
Instant upstreamHold = new Instant(2048L);
- CommittedBundle<Integer> filteredBundle =
- bundleFactory.createKeyedBundle(created, "key", filtered).commit(upstreamHold);
+ CommittedBundle<Integer> filteredBundle = bundleFactory.createKeyedBundle(created,
+ StructuralKey.of("key", StringUtf8Coder.of()),
+ filtered).commit(upstreamHold);
manager.updateWatermarks(
created,
TimerUpdate.empty(),
@@ -1053,7 +1064,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
@Test
public void extractFiredTimersReturnsFiredEventTimeTimers() {
- Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> initialTimers =
+ Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> initialTimers =
manager.extractFiredTimers();
// Watermarks haven't advanced
assertThat(initialTimers.entrySet(), emptyIterable());
@@ -1074,7 +1085,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
TimerData.of(StateNamespaces.global(), new Instant(5000L), TimeDomain.EVENT_TIME);
TimerData lastTimer =
TimerData.of(StateNamespaces.global(), new Instant(10000L), TimeDomain.EVENT_TIME);
- Object key = new Object();
+ StructuralKey<byte[]> key = StructuralKey.of(new byte[] {1, 4, 9}, ByteArrayCoder.of());
TimerUpdate update =
TimerUpdate.builder(key)
.setTimer(earliestTimer)
@@ -1090,11 +1101,11 @@ public class InMemoryWatermarkManagerTest implements Serializable {
new Instant(1000L));
manager.refreshAll();
- Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> firstTransformFiredTimers =
+ Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> firstTransformFiredTimers =
manager.extractFiredTimers();
assertThat(
firstTransformFiredTimers.get(filtered.getProducingTransformInternal()), not(nullValue()));
- Map<Object, FiredTimers> firstFilteredTimers =
+ Map<StructuralKey<?>, FiredTimers> firstFilteredTimers =
firstTransformFiredTimers.get(filtered.getProducingTransformInternal());
assertThat(firstFilteredTimers.get(key), not(nullValue()));
FiredTimers firstFired = firstFilteredTimers.get(key);
@@ -1107,11 +1118,11 @@ public class InMemoryWatermarkManagerTest implements Serializable {
Collections.<CommittedBundle<?>>emptyList()),
new Instant(50_000L));
manager.refreshAll();
- Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> secondTransformFiredTimers =
+ Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> secondTransformFiredTimers =
manager.extractFiredTimers();
assertThat(
secondTransformFiredTimers.get(filtered.getProducingTransformInternal()), not(nullValue()));
- Map<Object, FiredTimers> secondFilteredTimers =
+ Map<StructuralKey<?>, FiredTimers> secondFilteredTimers =
secondTransformFiredTimers.get(filtered.getProducingTransformInternal());
assertThat(secondFilteredTimers.get(key), not(nullValue()));
FiredTimers secondFired = secondFilteredTimers.get(key);
@@ -1121,7 +1132,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
@Test
public void extractFiredTimersReturnsFiredProcessingTimeTimers() {
- Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> initialTimers =
+ Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> initialTimers =
manager.extractFiredTimers();
// Watermarks haven't advanced
assertThat(initialTimers.entrySet(), emptyIterable());
@@ -1141,7 +1152,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
TimerData.of(StateNamespaces.global(), new Instant(5000L), TimeDomain.PROCESSING_TIME);
TimerData lastTimer =
TimerData.of(StateNamespaces.global(), new Instant(10000L), TimeDomain.PROCESSING_TIME);
- Object key = new Object();
+ StructuralKey<?> key = StructuralKey.of(-12L, VarLongCoder.of());
TimerUpdate update =
TimerUpdate.builder(key)
.setTimer(lastTimer)
@@ -1158,11 +1169,11 @@ public class InMemoryWatermarkManagerTest implements Serializable {
new Instant(1000L));
manager.refreshAll();
- Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> firstTransformFiredTimers =
+ Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> firstTransformFiredTimers =
manager.extractFiredTimers();
assertThat(
firstTransformFiredTimers.get(filtered.getProducingTransformInternal()), not(nullValue()));
- Map<Object, FiredTimers> firstFilteredTimers =
+ Map<StructuralKey<?>, FiredTimers> firstFilteredTimers =
firstTransformFiredTimers.get(filtered.getProducingTransformInternal());
assertThat(firstFilteredTimers.get(key), not(nullValue()));
FiredTimers firstFired = firstFilteredTimers.get(key);
@@ -1176,11 +1187,11 @@ public class InMemoryWatermarkManagerTest implements Serializable {
Collections.<CommittedBundle<?>>emptyList()),
new Instant(50_000L));
manager.refreshAll();
- Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> secondTransformFiredTimers =
+ Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> secondTransformFiredTimers =
manager.extractFiredTimers();
assertThat(
secondTransformFiredTimers.get(filtered.getProducingTransformInternal()), not(nullValue()));
- Map<Object, FiredTimers> secondFilteredTimers =
+ Map<StructuralKey<?>, FiredTimers> secondFilteredTimers =
secondTransformFiredTimers.get(filtered.getProducingTransformInternal());
assertThat(secondFilteredTimers.get(key), not(nullValue()));
FiredTimers secondFired = secondFilteredTimers.get(key);
@@ -1190,7 +1201,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
@Test
public void extractFiredTimersReturnsFiredSynchronizedProcessingTimeTimers() {
- Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> initialTimers =
+ Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> initialTimers =
manager.extractFiredTimers();
// Watermarks haven't advanced
assertThat(initialTimers.entrySet(), emptyIterable());
@@ -1210,7 +1221,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
StateNamespaces.global(), new Instant(5000L), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
TimerData lastTimer = TimerData.of(
StateNamespaces.global(), new Instant(10000L), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
- Object key = new Object();
+ StructuralKey<byte[]> key = StructuralKey.of(new byte[] {2, -2, 22}, ByteArrayCoder.of());
TimerUpdate update =
TimerUpdate.builder(key)
.setTimer(lastTimer)
@@ -1227,11 +1238,11 @@ public class InMemoryWatermarkManagerTest implements Serializable {
new Instant(1000L));
manager.refreshAll();
- Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> firstTransformFiredTimers =
+ Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> firstTransformFiredTimers =
manager.extractFiredTimers();
assertThat(
firstTransformFiredTimers.get(filtered.getProducingTransformInternal()), not(nullValue()));
- Map<Object, FiredTimers> firstFilteredTimers =
+ Map<StructuralKey<?>, FiredTimers> firstFilteredTimers =
firstTransformFiredTimers.get(filtered.getProducingTransformInternal());
assertThat(firstFilteredTimers.get(key), not(nullValue()));
FiredTimers firstFired = firstFilteredTimers.get(key);
@@ -1246,11 +1257,11 @@ public class InMemoryWatermarkManagerTest implements Serializable {
Collections.<CommittedBundle<?>>emptyList()),
new Instant(50_000L));
manager.refreshAll();
- Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> secondTransformFiredTimers =
+ Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> secondTransformFiredTimers =
manager.extractFiredTimers();
assertThat(
secondTransformFiredTimers.get(filtered.getProducingTransformInternal()), not(nullValue()));
- Map<Object, FiredTimers> secondFilteredTimers =
+ Map<StructuralKey<?>, FiredTimers> secondFilteredTimers =
secondTransformFiredTimers.get(filtered.getProducingTransformInternal());
assertThat(secondFilteredTimers.get(key), not(nullValue()));
FiredTimers secondFired = secondFilteredTimers.get(key);
@@ -1271,7 +1282,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
TimerData.of(StateNamespaces.global(), new Instant(2048L), TimeDomain.EVENT_TIME);
TimerUpdate update =
- TimerUpdate.builder("foo")
+ TimerUpdate.builder(StructuralKey.of("foo", StringUtf8Coder.of()))
.withCompletedTimers(ImmutableList.of(completedOne, completedTwo))
.setTimer(set)
.deletedTimer(deleted)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessBundleFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessBundleFactoryTest.java
index 1809dc6..abe2a19 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessBundleFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessBundleFactoryTest.java
@@ -19,11 +19,15 @@ package org.apache.beam.runners.direct;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertThat;
import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.WithKeys;
@@ -69,34 +73,38 @@ public class InProcessBundleFactoryTest {
}
@Test
- public void createRootBundleShouldCreateWithNullKey() {
+ public void createRootBundleShouldCreateWithEmptyKey() {
PCollection<Integer> pcollection = TestPipeline.create().apply(Create.of(1));
UncommittedBundle<Integer> inFlightBundle = bundleFactory.createRootBundle(pcollection);
CommittedBundle<Integer> bundle = inFlightBundle.commit(Instant.now());
- assertThat(bundle.getKey(), nullValue());
+ assertThat(bundle.getKey(),
+ Matchers.<StructuralKey<?>>equalTo(StructuralKey.of(null, VoidCoder.of())));
}
- private void createKeyedBundle(Object key) {
+ private <T> void createKeyedBundle(Coder<T> coder, T key) throws Exception {
PCollection<Integer> pcollection = TestPipeline.create().apply(Create.of(1));
+ StructuralKey skey = StructuralKey.of(key, coder);
UncommittedBundle<Integer> inFlightBundle =
- bundleFactory.createKeyedBundle(null, key, pcollection);
+ bundleFactory.createKeyedBundle(null, skey, pcollection);
CommittedBundle<Integer> bundle = inFlightBundle.commit(Instant.now());
- assertThat(bundle.getKey(), equalTo(key));
+ assertThat(bundle.getKey(), equalTo(skey));
}
@Test
- public void keyedWithNullKeyShouldCreateKeyedBundle() {
- createKeyedBundle(null);
+ public void keyedWithNullKeyShouldCreateKeyedBundle() throws Exception {
+ createKeyedBundle(VoidCoder.of(), null);
}
@Test
- public void keyedWithKeyShouldCreateKeyedBundle() {
- createKeyedBundle(new Object());
+ public void keyedWithKeyShouldCreateKeyedBundle() throws Exception {
+ createKeyedBundle(StringUtf8Coder.of(), "foo");
+ createKeyedBundle(VarIntCoder.of(), 1234);
+ createKeyedBundle(ByteArrayCoder.of(), new byte[] {0, 2, 4, 99});
}
private <T> CommittedBundle<T>
@@ -154,7 +162,7 @@ public class InProcessBundleFactoryTest {
assertThat(withed.getElements(), containsInAnyOrder(firstReplacement, secondReplacement));
assertThat(committed.getElements(), containsInAnyOrder(firstValue, secondValue));
- assertThat(withed.getKey(), equalTo(committed.getKey()));
+ assertThat(withed.getKey(), Matchers.<StructuralKey<?>>equalTo(committed.getKey()));
assertThat(withed.getPCollection(), equalTo(committed.getPCollection()));
assertThat(
withed.getSynchronizedProcessingOutputWatermark(),
@@ -203,21 +211,21 @@ public class InProcessBundleFactoryTest {
@Test
public void createBundleKeyedResultPropagatesKey() {
CommittedBundle<KV<String, Integer>> newBundle =
- bundleFactory
- .createBundle(
- bundleFactory.createKeyedBundle(null, "foo", created).commit(Instant.now()),
- downstream)
- .commit(Instant.now());
- assertThat(newBundle.getKey(), Matchers.<Object>equalTo("foo"));
+ bundleFactory.createBundle(
+ bundleFactory.createKeyedBundle(
+ null,
+ StructuralKey.of("foo", StringUtf8Coder.of()),
+ created).commit(Instant.now()),
+ downstream).commit(Instant.now());
+ assertThat(newBundle.getKey().getKey(), Matchers.<Object>equalTo("foo"));
}
@Test
public void createKeyedBundleKeyed() {
- CommittedBundle<KV<String, Integer>> keyedBundle =
- bundleFactory
- .createKeyedBundle(
- bundleFactory.createRootBundle(created).commit(Instant.now()), "foo", downstream)
- .commit(Instant.now());
- assertThat(keyedBundle.getKey(), Matchers.<Object>equalTo("foo"));
+ CommittedBundle<KV<String, Integer>> keyedBundle = bundleFactory.createKeyedBundle(
+ bundleFactory.createRootBundle(created).commit(Instant.now()),
+ StructuralKey.of("foo", StringUtf8Coder.of()),
+ downstream).commit(Instant.now());
+ assertThat(keyedBundle.getKey().getKey(), Matchers.<Object>equalTo("foo"));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java
index 10b8721..18db400 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java
@@ -31,6 +31,8 @@ import org.apache.beam.runners.direct.InProcessExecutionContext.InProcessStepCon
import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
import org.apache.beam.runners.direct.InProcessPipelineRunner.PCollectionViewWriter;
import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.io.CountingInput;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -159,16 +161,16 @@ public class InProcessEvaluationContextTest {
@Test
public void getExecutionContextSameStepSameKeyState() {
InProcessExecutionContext fooContext =
- context.getExecutionContext(created.getProducingTransformInternal(), "foo");
+ context.getExecutionContext(created.getProducingTransformInternal(),
+ StructuralKey.of("foo", StringUtf8Coder.of()));
StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
InProcessStepContext stepContext = fooContext.getOrCreateStepContext("s1", "s1");
stepContext.stateInternals().state(StateNamespaces.global(), intBag).add(1);
- context.handleResult(
- InProcessBundleFactory.create()
- .createKeyedBundle(null, "foo", created)
+ context.handleResult(InProcessBundleFactory.create()
+ .createKeyedBundle(null, StructuralKey.of("foo", StringUtf8Coder.of()), created)
.commit(Instant.now()),
ImmutableList.<TimerData>of(),
StepTransformResult.withoutHold(created.getProducingTransformInternal())
@@ -176,7 +178,8 @@ public class InProcessEvaluationContextTest {
.build());
InProcessExecutionContext secondFooContext =
- context.getExecutionContext(created.getProducingTransformInternal(), "foo");
+ context.getExecutionContext(created.getProducingTransformInternal(),
+ StructuralKey.of("foo", StringUtf8Coder.of()));
assertThat(
secondFooContext
.getOrCreateStepContext("s1", "s1")
@@ -190,7 +193,8 @@ public class InProcessEvaluationContextTest {
@Test
public void getExecutionContextDifferentKeysIndependentState() {
InProcessExecutionContext fooContext =
- context.getExecutionContext(created.getProducingTransformInternal(), "foo");
+ context.getExecutionContext(created.getProducingTransformInternal(),
+ StructuralKey.of("foo", StringUtf8Coder.of()));
StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
@@ -201,7 +205,8 @@ public class InProcessEvaluationContextTest {
.add(1);
InProcessExecutionContext barContext =
- context.getExecutionContext(created.getProducingTransformInternal(), "bar");
+ context.getExecutionContext(created.getProducingTransformInternal(),
+ StructuralKey.of("bar", StringUtf8Coder.of()));
assertThat(barContext, not(equalTo(fooContext)));
assertThat(
barContext
@@ -214,7 +219,7 @@ public class InProcessEvaluationContextTest {
@Test
public void getExecutionContextDifferentStepsIndependentState() {
- String myKey = "foo";
+ StructuralKey<?> myKey = StructuralKey.of("foo", StringUtf8Coder.of());
InProcessExecutionContext fooContext =
context.getExecutionContext(created.getProducingTransformInternal(), myKey);
@@ -269,7 +274,7 @@ public class InProcessEvaluationContextTest {
@Test
public void handleResultStoresState() {
- String myKey = "foo";
+ StructuralKey<?> myKey = StructuralKey.of("foo".getBytes(), ByteArrayCoder.of());
InProcessExecutionContext fooContext =
context.getExecutionContext(downstream.getProducingTransformInternal(), myKey);
@@ -359,7 +364,7 @@ public class InProcessEvaluationContextTest {
.build();
context.handleResult(null, ImmutableList.<TimerData>of(), holdResult);
- String key = "foo";
+ StructuralKey<?> key = StructuralKey.of("foo".length(), VarIntCoder.of());
TimerData toFire =
TimerData.of(StateNamespaces.global(), new Instant(100L), TimeDomain.EVENT_TIME);
InProcessTransformResult timerResult =
@@ -383,11 +388,12 @@ public class InProcessEvaluationContextTest {
// Should cause the downstream timer to fire
context.handleResult(null, ImmutableList.<TimerData>of(), advanceResult);
- Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> fired = context.extractFiredTimers();
+ Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> fired =
+ context.extractFiredTimers();
assertThat(
fired,
Matchers.<AppliedPTransform<?, ?, ?>>hasKey(downstream.getProducingTransformInternal()));
- Map<Object, FiredTimers> downstreamFired =
+ Map<StructuralKey<?>, FiredTimers> downstreamFired =
fired.get(downstream.getProducingTransformInternal());
assertThat(downstreamFired, Matchers.<Object>hasKey(key));
@@ -402,23 +408,27 @@ public class InProcessEvaluationContextTest {
@Test
public void createBundleKeyedResultPropagatesKey() {
+ StructuralKey<String> key = StructuralKey.of("foo", StringUtf8Coder.of());
CommittedBundle<KV<String, Integer>> newBundle =
context
.createBundle(
- bundleFactory.createKeyedBundle(null, "foo", created).commit(Instant.now()),
- downstream)
- .commit(Instant.now());
- assertThat(newBundle.getKey(), Matchers.<Object>equalTo("foo"));
+ bundleFactory.createKeyedBundle(
+ null, key,
+ created).commit(Instant.now()),
+ downstream).commit(Instant.now());
+ assertThat(newBundle.getKey(), Matchers.<StructuralKey<?>>equalTo(key));
}
@Test
public void createKeyedBundleKeyed() {
+ StructuralKey<String> key = StructuralKey.of("foo", StringUtf8Coder.of());
CommittedBundle<KV<String, Integer>> keyedBundle =
- context
- .createKeyedBundle(
- bundleFactory.createRootBundle(created).commit(Instant.now()), "foo", downstream)
- .commit(Instant.now());
- assertThat(keyedBundle.getKey(), Matchers.<Object>equalTo("foo"));
+ context.createKeyedBundle(
+ bundleFactory.createRootBundle(created).commit(Instant.now()),
+ key,
+ downstream).commit(Instant.now());
+ assertThat(keyedBundle.getKey(),
+ Matchers.<StructuralKey<?>>equalTo(key));
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactoryTest.java
index 1172a4d..28a3cf6 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactoryTest.java
@@ -26,6 +26,7 @@ import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.ReifyTimestampsAndWindows;
@@ -73,16 +74,28 @@ public class InProcessGroupByKeyOnlyEvaluatorFactoryTest {
bundleFactory.createRootBundle(kvs).commit(Instant.now());
InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class);
- UncommittedBundle<KeyedWorkItem<String, Integer>> fooBundle =
- bundleFactory.createKeyedBundle(null, "foo", groupedKvs);
- UncommittedBundle<KeyedWorkItem<String, Integer>> barBundle =
- bundleFactory.createKeyedBundle(null, "bar", groupedKvs);
- UncommittedBundle<KeyedWorkItem<String, Integer>> bazBundle =
- bundleFactory.createKeyedBundle(null, "baz", groupedKvs);
-
- when(evaluationContext.createKeyedBundle(inputBundle, "foo", groupedKvs)).thenReturn(fooBundle);
- when(evaluationContext.createKeyedBundle(inputBundle, "bar", groupedKvs)).thenReturn(barBundle);
- when(evaluationContext.createKeyedBundle(inputBundle, "baz", groupedKvs)).thenReturn(bazBundle);
+ StructuralKey<String> fooKey = StructuralKey.of("foo", StringUtf8Coder.of());
+ UncommittedBundle<KeyedWorkItem<String, Integer>> fooBundle = bundleFactory.createKeyedBundle(
+ null, fooKey,
+ groupedKvs);
+ StructuralKey<String> barKey = StructuralKey.of("bar", StringUtf8Coder.of());
+ UncommittedBundle<KeyedWorkItem<String, Integer>> barBundle = bundleFactory.createKeyedBundle(
+ null, barKey,
+ groupedKvs);
+ StructuralKey<String> bazKey = StructuralKey.of("baz", StringUtf8Coder.of());
+ UncommittedBundle<KeyedWorkItem<String, Integer>> bazBundle = bundleFactory.createKeyedBundle(
+ null, bazKey,
+ groupedKvs);
+
+ when(evaluationContext.createKeyedBundle(inputBundle,
+ fooKey,
+ groupedKvs)).thenReturn(fooBundle);
+ when(evaluationContext.createKeyedBundle(inputBundle,
+ barKey,
+ groupedKvs)).thenReturn(barBundle);
+ when(evaluationContext.createKeyedBundle(inputBundle,
+ bazKey,
+ groupedKvs)).thenReturn(bazBundle);
// The input to a GroupByKey is assumed to be a KvCoder
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java
index 5a92ce3..9314f5e 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java
@@ -18,23 +18,34 @@
package org.apache.beam.runners.direct;
import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.fail;
import org.apache.beam.runners.direct.InProcessPipelineRunner.InProcessPipelineResult;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+import com.google.common.collect.ImmutableMap;
import com.fasterxml.jackson.annotation.JsonValue;
+
import org.junit.Rule;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableMessageMatcher;
@@ -43,6 +54,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import java.io.Serializable;
+import java.util.Map;
/**
* Tests for basic {@link InProcessPipelineRunner} functionality.
@@ -79,6 +91,54 @@ public class InProcessPipelineRunnerTest implements Serializable {
result.awaitCompletion();
}
+ @Test(timeout = 5000L)
+ public void byteArrayCountShouldSucceed() {
+ Pipeline p = getPipeline();
+
+ SerializableFunction<Integer, byte[]> getBytes = new SerializableFunction<Integer, byte[]>() {
+ @Override
+ public byte[] apply(Integer input) {
+ try {
+ return CoderUtils.encodeToByteArray(VarIntCoder.of(), input);
+ } catch (CoderException e) {
+ fail("Unexpected Coder Exception " + e);
+ throw new AssertionError("Unreachable");
+ }
+ }
+ };
+ TypeDescriptor<byte[]> td = new TypeDescriptor<byte[]>() {
+ };
+ PCollection<byte[]> foos =
+ p.apply(Create.of(1, 1, 1, 2, 2, 3)).apply(MapElements.via(getBytes).withOutputType(td));
+ PCollection<byte[]> msync =
+ p.apply(Create.of(1, -2, -8, -16)).apply(MapElements.via(getBytes).withOutputType(td));
+ PCollection<byte[]> bytes =
+ PCollectionList.of(foos).and(msync).apply(Flatten.<byte[]>pCollections());
+ PCollection<KV<byte[], Long>> counts = bytes.apply(Count.<byte[]>perElement());
+ PCollection<KV<Integer, Long>> countsBackToString =
+ counts.apply(MapElements.via(new SimpleFunction<KV<byte[], Long>, KV<Integer, Long>>() {
+ @Override
+ public KV<Integer, Long> apply(KV<byte[], Long> input) {
+ try {
+ return KV.of(CoderUtils.decodeFromByteArray(VarIntCoder.of(), input.getKey()),
+ input.getValue());
+ } catch (CoderException e) {
+ fail("Unexpected Coder Exception " + e);
+ throw new AssertionError("Unreachable");
+ }
+ }
+ }));
+
+ Map<Integer, Long> expected = ImmutableMap.<Integer, Long>builder().put(1, 4L)
+ .put(2, 2L)
+ .put(3, 1L)
+ .put(-2, 1L)
+ .put(-8, 1L)
+ .put(-16, 1L)
+ .build();
+ PAssert.thatMap(countsBackToString).isEqualTo(expected);
+ }
+
@Test
public void transformDisplayDataExceptionShouldFail() {
DoFn<Integer, Integer> brokenDoFn = new DoFn<Integer, Integer>() {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessTimerInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessTimerInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessTimerInternalsTest.java
index 34a8980..3e01f44 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessTimerInternalsTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessTimerInternalsTest.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.when;
import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate;
import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate.TimerUpdateBuilder;
import org.apache.beam.runners.direct.InMemoryWatermarkManager.TransformWatermarks;
+import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.TimerInternals.TimerData;
import org.apache.beam.sdk.util.state.StateNamespaces;
@@ -55,7 +56,7 @@ public class InProcessTimerInternalsTest {
MockitoAnnotations.initMocks(this);
clock = MockClock.fromInstant(new Instant(0));
- timerUpdateBuilder = TimerUpdate.builder(1234);
+ timerUpdateBuilder = TimerUpdate.builder(StructuralKey.of(1234, VarIntCoder.of()));
internals = InProcessTimerInternals.create(clock, watermarks, timerUpdateBuilder);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoInProcessEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoInProcessEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoInProcessEvaluatorTest.java
index ca15d9c..1127ed2 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoInProcessEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoInProcessEvaluatorTest.java
@@ -154,7 +154,7 @@ public class ParDoInProcessEvaluatorTest {
when(stepContext.getTimerUpdate()).thenReturn(TimerUpdate.empty());
when(
evaluationContext.getExecutionContext(
- Mockito.any(AppliedPTransform.class), Mockito.any(Object.class)))
+ Mockito.any(AppliedPTransform.class), Mockito.any(StructuralKey.class)))
.thenReturn(executionContext);
when(evaluationContext.createCounterSet()).thenReturn(new CounterSet());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
index cecfe01..a6f31c0 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
@@ -114,8 +114,8 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
InProcessExecutionContext executionContext =
new InProcessExecutionContext(null, null, null, null);
- when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), null))
- .thenReturn(executionContext);
+ when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(),
+ inputBundle.getKey())).thenReturn(executionContext);
CounterSet counters = new CounterSet();
when(evaluationContext.createCounterSet()).thenReturn(counters);
@@ -199,8 +199,8 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
InProcessExecutionContext executionContext =
new InProcessExecutionContext(null, null, null, null);
- when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), null))
- .thenReturn(executionContext);
+ when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(),
+ inputBundle.getKey())).thenReturn(executionContext);
CounterSet counters = new CounterSet();
when(evaluationContext.createCounterSet()).thenReturn(counters);
@@ -287,10 +287,12 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
when(evaluationContext.createBundle(inputBundle, elementOutput))
.thenReturn(elementOutputBundle);
- InProcessExecutionContext executionContext =
- new InProcessExecutionContext(null, "myKey", null, null);
- when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), null))
- .thenReturn(executionContext);
+ InProcessExecutionContext executionContext = new InProcessExecutionContext(null,
+ StructuralKey.of("myKey", StringUtf8Coder.of()),
+ null,
+ null);
+ when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(),
+ inputBundle.getKey())).thenReturn(executionContext);
CounterSet counters = new CounterSet();
when(evaluationContext.createCounterSet()).thenReturn(counters);
@@ -397,10 +399,11 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
when(evaluationContext.createBundle(inputBundle, elementOutput))
.thenReturn(elementOutputBundle);
- InProcessExecutionContext executionContext =
- new InProcessExecutionContext(null, "myKey", null, null);
- when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), null))
- .thenReturn(executionContext);
+ InProcessExecutionContext executionContext = new InProcessExecutionContext(null,
+ StructuralKey.of("myKey", StringUtf8Coder.of()),
+ null, null);
+ when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(),
+ inputBundle.getKey())).thenReturn(executionContext);
CounterSet counters = new CounterSet();
when(evaluationContext.createCounterSet()).thenReturn(counters);
@@ -419,7 +422,7 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
assertThat(
result.getTimerUpdate(),
equalTo(
- TimerUpdate.builder("myKey")
+ TimerUpdate.builder(StructuralKey.of("myKey", StringUtf8Coder.of()))
.setTimer(addedTimer)
.setTimer(addedTimer)
.setTimer(addedTimer)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
index 236ad17..a1480e5 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
@@ -90,8 +90,8 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
when(evaluationContext.createBundle(inputBundle, collection)).thenReturn(outputBundle);
InProcessExecutionContext executionContext =
new InProcessExecutionContext(null, null, null, null);
- when(evaluationContext.getExecutionContext(collection.getProducingTransformInternal(), null))
- .thenReturn(executionContext);
+ when(evaluationContext.getExecutionContext(collection.getProducingTransformInternal(),
+ inputBundle.getKey())).thenReturn(executionContext);
CounterSet counters = new CounterSet();
when(evaluationContext.createCounterSet()).thenReturn(counters);
@@ -142,8 +142,8 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
when(evaluationContext.createBundle(inputBundle, collection)).thenReturn(outputBundle);
InProcessExecutionContext executionContext =
new InProcessExecutionContext(null, null, null, null);
- when(evaluationContext.getExecutionContext(collection.getProducingTransformInternal(), null))
- .thenReturn(executionContext);
+ when(evaluationContext.getExecutionContext(collection.getProducingTransformInternal(),
+ inputBundle.getKey())).thenReturn(executionContext);
CounterSet counters = new CounterSet();
when(evaluationContext.createCounterSet()).thenReturn(counters);
@@ -204,9 +204,11 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle);
- InProcessExecutionContext executionContext =
- new InProcessExecutionContext(null, "myKey", null, null);
- when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), null))
+ InProcessExecutionContext executionContext = new InProcessExecutionContext(null,
+ StructuralKey.of("myKey", StringUtf8Coder.of()),
+ null, null);
+ when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(),
+ inputBundle.getKey()))
.thenReturn(executionContext);
CounterSet counters = new CounterSet();
when(evaluationContext.createCounterSet()).thenReturn(counters);
@@ -292,6 +294,7 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
});
PCollection<KV<String, Integer>> mainOutput = input.apply(pardo);
+ StructuralKey<?> key = StructuralKey.of("myKey", StringUtf8Coder.of());
CommittedBundle<String> inputBundle =
bundleFactory.createRootBundle(input).commit(Instant.now());
@@ -301,9 +304,12 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle);
- InProcessExecutionContext executionContext =
- new InProcessExecutionContext(null, "myKey", null, null);
- when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), null))
+ InProcessExecutionContext executionContext = new InProcessExecutionContext(null,
+ key,
+ null,
+ null);
+ when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(),
+ inputBundle.getKey()))
.thenReturn(executionContext);
CounterSet counters = new CounterSet();
when(evaluationContext.createCounterSet()).thenReturn(counters);
@@ -316,9 +322,10 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
InProcessTransformResult result = evaluator.finishBundle();
- assertThat(
- result.getTimerUpdate(),
- equalTo(
- TimerUpdate.builder("myKey").setTimer(addedTimer).deletedTimer(deletedTimer).build()));
+ assertThat(result.getTimerUpdate(),
+ equalTo(TimerUpdate.builder(StructuralKey.of("myKey", StringUtf8Coder.of()))
+ .setTimer(addedTimer)
+ .deletedTimer(deletedTimer)
+ .build()));
}
}