You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/10/06 22:31:19 UTC
[3/4] incubator-beam git commit: Remove KeyedResourcePool
Remove KeyedResourcePool
This interface is no longer used. Instead, the runner ensures that
bundles will be provided containing the appropriate input to the
TestStreamEvaluatorFactory.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/41fb16f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/41fb16f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/41fb16f0
Branch: refs/heads/master
Commit: 41fb16f014a79d2b9c149c5b369db12b61c4c774
Parents: 7306e16
Author: Thomas Groh <tg...@google.com>
Authored: Wed Oct 5 13:12:48 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Oct 6 15:14:38 2016 -0700
----------------------------------------------------------------------
.../direct/BoundedReadEvaluatorFactory.java | 40 +++--
.../beam/runners/direct/DirectRunner.java | 2 +
.../beam/runners/direct/EmptyInputProvider.java | 49 ++++++
.../direct/ExecutorServiceParallelExecutor.java | 27 ++-
.../runners/direct/FlattenEvaluatorFactory.java | 18 +-
.../beam/runners/direct/KeyedResourcePool.java | 47 ------
.../runners/direct/LockedKeyedResourcePool.java | 95 -----------
.../beam/runners/direct/RootInputProvider.java | 41 +++++
.../runners/direct/RootProviderRegistry.java | 65 ++++++++
.../direct/RootTransformEvaluatorFactory.java | 42 -----
.../direct/TestStreamEvaluatorFactory.java | 39 +++--
.../direct/TransformEvaluatorRegistry.java | 17 +-
.../direct/UnboundedReadEvaluatorFactory.java | 56 ++++---
.../direct/BoundedReadEvaluatorFactoryTest.java | 3 +-
.../direct/FlattenEvaluatorFactoryTest.java | 3 +-
.../direct/LockedKeyedResourcePoolTest.java | 163 -------------------
.../direct/TestStreamEvaluatorFactoryTest.java | 3 +-
.../UnboundedReadEvaluatorFactoryTest.java | 8 +-
18 files changed, 269 insertions(+), 449 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
index 4936ad9..326a535 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
@@ -39,28 +39,13 @@ import org.apache.beam.sdk.values.PCollection;
* A {@link TransformEvaluatorFactory} that produces {@link TransformEvaluator TransformEvaluators}
* for the {@link Bounded Read.Bounded} primitive {@link PTransform}.
*/
-final class BoundedReadEvaluatorFactory implements RootTransformEvaluatorFactory {
+final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
private final EvaluationContext evaluationContext;
BoundedReadEvaluatorFactory(EvaluationContext evaluationContext) {
this.evaluationContext = evaluationContext;
}
- @Override
- public Collection<CommittedBundle<?>> getInitialInputs(AppliedPTransform<?, ?, ?> transform) {
- return createInitialSplits((AppliedPTransform) transform);
- }
-
- private <OutputT> Collection<CommittedBundle<?>> createInitialSplits(
- AppliedPTransform<?, ?, Read.Bounded<OutputT>> transform) {
- BoundedSource<OutputT> source = transform.getTransform().getSource();
- return Collections.<CommittedBundle<?>>singleton(
- evaluationContext
- .<BoundedSourceShard<OutputT>>createRootBundle()
- .add(WindowedValue.valueInGlobalWindow(BoundedSourceShard.of(source)))
- .commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
- }
-
@SuppressWarnings({"unchecked", "rawtypes"})
@Override
@Nullable
@@ -132,4 +117,27 @@ final class BoundedReadEvaluatorFactory implements RootTransformEvaluatorFactory
abstract BoundedSource<T> getSource();
}
+
+ static class InputProvider implements RootInputProvider {
+ private final EvaluationContext evaluationContext;
+
+ InputProvider(EvaluationContext evaluationContext) {
+ this.evaluationContext = evaluationContext;
+ }
+
+ @Override
+ public Collection<CommittedBundle<?>> getInitialInputs(AppliedPTransform<?, ?, ?> transform) {
+ return createInitialSplits((AppliedPTransform) transform);
+ }
+
+ private <OutputT> Collection<CommittedBundle<?>> createInitialSplits(
+ AppliedPTransform<?, ?, Read.Bounded<OutputT>> transform) {
+ BoundedSource<OutputT> source = transform.getTransform().getSource();
+ return Collections.<CommittedBundle<?>>singleton(
+ evaluationContext
+ .<BoundedSourceShard<OutputT>>createRootBundle()
+ .add(WindowedValue.valueInGlobalWindow(BoundedSourceShard.of(source)))
+ .commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 2ec4f08..67ec3e6 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -248,12 +248,14 @@ public class DirectRunner
// independent executor service for each run
ExecutorService executorService = executorServiceSupplier.get();
+ RootInputProvider rootInputProvider = RootProviderRegistry.defaultRegistry(context);
TransformEvaluatorRegistry registry = TransformEvaluatorRegistry.defaultRegistry(context);
PipelineExecutor executor =
ExecutorServiceParallelExecutor.create(
executorService,
consumerTrackingVisitor.getValueToConsumers(),
keyedPValueVisitor.getKeyedPValues(),
+ rootInputProvider,
registry,
defaultModelEnforcements(options),
context);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
new file mode 100644
index 0000000..10d63e9
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import java.util.Collection;
+import java.util.Collections;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+
+/**
+ * A {@link RootInputProvider} that provides a singleton empty bundle.
+ */
+class EmptyInputProvider implements RootInputProvider {
+ private final EvaluationContext evaluationContext;
+
+ EmptyInputProvider(EvaluationContext evaluationContext) {
+ this.evaluationContext = evaluationContext;
+ }
+
+ /**
+ * {@inheritDoc}.
+ *
+ * <p>Returns a single empty bundle. This bundle ensures that any {@link PTransform PTransforms}
+ * that consume from the output of the provided {@link AppliedPTransform} have watermarks updated
+ * as appropriate.
+ */
+ @Override
+ public Collection<CommittedBundle<?>> getInitialInputs(AppliedPTransform<?, ?, ?> transform) {
+ return Collections.<CommittedBundle<?>>singleton(
+ evaluationContext.createRootBundle().commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index bb89699..52c45c3 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.runners.direct;
+import static com.google.common.base.Preconditions.checkState;
+
import com.google.auto.value.AutoValue;
import com.google.common.base.MoreObjects;
import com.google.common.base.Optional;
@@ -67,6 +69,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
private final Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers;
private final Set<PValue> keyedPValues;
+ private final RootInputProvider rootInputProvider;
private final TransformEvaluatorRegistry registry;
@SuppressWarnings("rawtypes")
private final Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>>
@@ -101,18 +104,27 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
ExecutorService executorService,
Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
Set<PValue> keyedPValues,
+ RootInputProvider rootInputProvider,
TransformEvaluatorRegistry registry,
@SuppressWarnings("rawtypes")
- Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>> transformEnforcements,
+ Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>>
+ transformEnforcements,
EvaluationContext context) {
return new ExecutorServiceParallelExecutor(
- executorService, valueToConsumers, keyedPValues, registry, transformEnforcements, context);
+ executorService,
+ valueToConsumers,
+ keyedPValues,
+ rootInputProvider,
+ registry,
+ transformEnforcements,
+ context);
}
private ExecutorServiceParallelExecutor(
ExecutorService executorService,
Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
Set<PValue> keyedPValues,
+ RootInputProvider rootInputProvider,
TransformEvaluatorRegistry registry,
@SuppressWarnings("rawtypes")
Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>> transformEnforcements,
@@ -120,6 +132,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
this.executorService = executorService;
this.valueToConsumers = valueToConsumers;
this.keyedPValues = keyedPValues;
+ this.rootInputProvider = rootInputProvider;
this.registry = registry;
this.transformEnforcements = transformEnforcements;
this.evaluationContext = context;
@@ -153,7 +166,12 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
public void start(Collection<AppliedPTransform<?, ?, ?>> roots) {
for (AppliedPTransform<?, ?, ?> root : roots) {
ConcurrentLinkedQueue<CommittedBundle<?>> pending = new ConcurrentLinkedQueue<>();
- pending.addAll(registry.getInitialInputs(root));
+ Collection<CommittedBundle<?>> initialInputs = rootInputProvider.getInitialInputs(root);
+ checkState(
+ !initialInputs.isEmpty(),
+ "All root transforms must have initial inputs. Got 0 for %s",
+ root.getFullName());
+ pending.addAll(initialInputs);
pendingRootBundles.put(root, pending);
}
evaluationContext.initialize(pendingRootBundles);
@@ -385,7 +403,8 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
LOG.debug("Executor Update: {}", update);
if (update.getBundle().isPresent()) {
if (ExecutorState.ACTIVE == startingState
- || (ExecutorState.PROCESSING == startingState && noWorkOutstanding)) {
+ || (ExecutorState.PROCESSING == startingState
+ && noWorkOutstanding)) {
scheduleConsumers(update);
} else {
allUpdates.offer(update);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
index 90db040..57d5628 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
@@ -17,15 +17,12 @@
*/
package org.apache.beam.runners.direct;
-import java.util.Collection;
-import java.util.Collections;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList;
import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
@@ -34,26 +31,13 @@ import org.apache.beam.sdk.values.PCollectionList;
* The {@link DirectRunner} {@link TransformEvaluatorFactory} for the {@link Flatten}
* {@link PTransform}.
*/
-class FlattenEvaluatorFactory implements RootTransformEvaluatorFactory {
+class FlattenEvaluatorFactory implements TransformEvaluatorFactory {
private final EvaluationContext evaluationContext;
FlattenEvaluatorFactory(EvaluationContext evaluationContext) {
this.evaluationContext = evaluationContext;
}
- /**
- * {@inheritDoc}.
- *
- * <p>Returns a single empty bundle. {@link Flatten} on no inputs produces no outputs. This bundle
- * ensures that any {@link PTransform PTransforms} that consume from the output of the provided
- * {@link AppliedPTransform} have watermarks updated as appropriate.
- */
- @Override
- public Collection<CommittedBundle<?>> getInitialInputs(AppliedPTransform<?, ?, ?> transform) {
- return Collections.<CommittedBundle<?>>singleton(
- evaluationContext.createRootBundle().commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
- }
-
@Override
public <InputT> TransformEvaluator<InputT> forApplication(
AppliedPTransform<?, ?, ?> application, CommittedBundle<?> inputBundle) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedResourcePool.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedResourcePool.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedResourcePool.java
deleted file mode 100644
index b976b69..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedResourcePool.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.direct;
-
-import com.google.common.base.Optional;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-
-/**
- * A pool of resources associated with specific keys. Implementations enforce specific use patterns,
- * such as limiting the the number of outstanding elements available per key.
- */
-interface KeyedResourcePool<K, V> {
- /**
- * Tries to acquire a value for the provided key, loading it via the provided loader if necessary.
- *
- * <p>If the returned {@link Optional} contains a value, the caller obtains ownership of that
- * value. The value should be released back to this {@link KeyedResourcePool} after the
- * caller no longer has use of it using {@link #release(Object, Object)}.
- *
- * <p>The provided {@link Callable} <b>must not</b> return null; it may either return a non-null
- * value or throw an exception.
- */
- Optional<V> tryAcquire(K key, Callable<V> loader) throws ExecutionException;
-
- /**
- * Release the provided value, relinquishing ownership of it. Future calls to
- * {@link #tryAcquire(Object, Callable)} may return the released value.
- */
- void release(K key, V value);
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/LockedKeyedResourcePool.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/LockedKeyedResourcePool.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/LockedKeyedResourcePool.java
deleted file mode 100644
index 8b1e0b1..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/LockedKeyedResourcePool.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.direct;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.ExecutionError;
-import com.google.common.util.concurrent.UncheckedExecutionException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-
-/**
- * A {@link KeyedResourcePool} which is limited to at most one outstanding instance at a time for
- * each key.
- */
-class LockedKeyedResourcePool<K, V> implements KeyedResourcePool<K, V> {
- /**
- * A map from each key to an {@link Optional} of the associated value. At most one value is stored
- * per key, and it is obtained by at most one thread at a time.
- *
- * <p>For each key in this map:
- *
- * <ul>
- * <li>If there is no associated value, then no value has been stored yet.
- * <li>If the value is {@code Optional.absent()} then the value is currently in use.
- * <li>If the value is {@code Optional.present()} then the contained value is available for use.
- * </ul>
- */
- public static <K, V> LockedKeyedResourcePool<K, V> create() {
- return new LockedKeyedResourcePool<>();
- }
-
- private final ConcurrentMap<K, Optional<V>> cache;
-
- private LockedKeyedResourcePool() {
- cache = new ConcurrentHashMap<>();
- }
-
- @Override
- public Optional<V> tryAcquire(K key, Callable<V> loader) throws ExecutionException {
- Optional<V> value = cache.replace(key, Optional.<V>absent());
- if (value == null) {
- // No value already existed, so populate the cache with the value returned by the loader
- cache.putIfAbsent(key, Optional.of(load(loader)));
- // Some other thread may obtain the result after the putIfAbsent, so retry acquisition
- value = cache.replace(key, Optional.<V>absent());
- }
- return value;
- }
-
- private V load(Callable<V> loader) throws ExecutionException {
- try {
- return loader.call();
- } catch (Error t) {
- throw new ExecutionError(t);
- } catch (RuntimeException e) {
- throw new UncheckedExecutionException(e);
- } catch (Exception e) {
- throw new ExecutionException(e);
- }
- }
-
- @Override
- public void release(K key, V value) {
- Optional<V> replaced = cache.replace(key, Optional.of(value));
- checkNotNull(replaced, "Tried to release before a value was acquired");
- checkState(
- !replaced.isPresent(),
- "Released a value to a %s where there is already a value present for key %s (%s). "
- + "At most one value may be present at a time.",
- LockedKeyedResourcePool.class.getSimpleName(),
- key,
- replaced);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootInputProvider.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootInputProvider.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootInputProvider.java
new file mode 100644
index 0000000..40c7301
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootInputProvider.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct;
+
+import java.util.Collection;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+
+/**
+ * Provides {@link CommittedBundle bundles} that will be provided to the
+ * {@link PTransform PTransforms} that are at the root of a {@link Pipeline}.
+ */
+interface RootInputProvider {
+ /**
+ * Get the initial inputs for the {@link AppliedPTransform}. The {@link AppliedPTransform} will be
+ * provided with these {@link CommittedBundle bundles} as input when the {@link Pipeline} runs.
+ *
+ * <p>For source transforms, these should be sufficient that, when provided to the evaluators
+ * produced by {@link TransformEvaluatorFactory#forApplication(AppliedPTransform,
+ * CommittedBundle)}, all of the elements contained in the source are eventually produced.
+ */
+ Collection<CommittedBundle<?>> getInitialInputs(AppliedPTransform<?, ?, ?> transform);
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java
new file mode 100644
index 0000000..f6335fd
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList;
+import org.apache.beam.sdk.transforms.PTransform;
+
+/**
+ * A {@link RootInputProvider} that delegates to primitive {@link RootInputProvider} implementations
+ * based on the type of {@link PTransform} of the application.
+ */
+class RootProviderRegistry implements RootInputProvider {
+ public static RootProviderRegistry defaultRegistry(EvaluationContext context) {
+ ImmutableMap.Builder<Class<? extends PTransform>, RootInputProvider> defaultProviders =
+ ImmutableMap.builder();
+ defaultProviders
+ .put(Read.Bounded.class, new BoundedReadEvaluatorFactory.InputProvider(context))
+ .put(Read.Unbounded.class, new UnboundedReadEvaluatorFactory.InputProvider(context))
+ .put(TestStream.class, new TestStreamEvaluatorFactory.InputProvider(context))
+ .put(FlattenPCollectionList.class, new EmptyInputProvider(context));
+ return new RootProviderRegistry(defaultProviders.build());
+ }
+
+ private final Map<Class<? extends PTransform>, RootInputProvider> providers;
+
+ private RootProviderRegistry(Map<Class<? extends PTransform>, RootInputProvider> providers) {
+ this.providers = providers;
+ }
+
+ @Override
+ public Collection<CommittedBundle<?>> getInitialInputs(AppliedPTransform<?, ?, ?> transform) {
+ Class<? extends PTransform> transformClass = transform.getTransform().getClass();
+ RootInputProvider provider =
+ checkNotNull(
+ providers.get(transformClass),
+ "Tried to get a %s for a Transform of type %s, but there is no such provider",
+ RootInputProvider.class.getSimpleName(),
+ transformClass.getSimpleName());
+ return provider.getInitialInputs(transform);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootTransformEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootTransformEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootTransformEvaluatorFactory.java
deleted file mode 100644
index 5785dea..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootTransformEvaluatorFactory.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.direct;
-
-import java.util.Collection;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
-
-/**
- * A {@link TransformEvaluatorFactory} for {@link PTransform PTransforms} that are at the root of a
- * {@link Pipeline}. Provides a way to get initial inputs, which will cause the {@link PTransform}
- * to produce all of the appropriate output.
- */
-interface RootTransformEvaluatorFactory extends TransformEvaluatorFactory {
- /**
- * Get the initial inputs for the {@link AppliedPTransform}. The {@link AppliedPTransform} will be
- * provided with these {@link CommittedBundle bundles} as input when the {@link Pipeline} runs.
- *
- * <p>For source transforms, these should be sufficient that, when provided to the evaluators
- * produced by {@link #forApplication(AppliedPTransform, CommittedBundle)}, all of the elements
- * contained in the source are eventually produced.
- */
- Collection<CommittedBundle<?>> getInitialInputs(AppliedPTransform<?, ?, ?> transform);
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
index 8e634c8..ffb4fb5 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
@@ -52,28 +52,13 @@ import org.joda.time.Duration;
import org.joda.time.Instant;
/** The {@link TransformEvaluatorFactory} for the {@link TestStream} primitive. */
-class TestStreamEvaluatorFactory implements RootTransformEvaluatorFactory {
+class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
private final EvaluationContext evaluationContext;
TestStreamEvaluatorFactory(EvaluationContext evaluationContext) {
this.evaluationContext = evaluationContext;
}
- @Override
- public Collection<CommittedBundle<?>> getInitialInputs(AppliedPTransform<?, ?, ?> transform) {
- return createInputBundle((AppliedPTransform) transform);
- }
-
- private <T> Collection<CommittedBundle<?>> createInputBundle(
- AppliedPTransform<?, ?, TestStream<T>> transform) {
- CommittedBundle<TestStreamIndex<T>> initialBundle =
- evaluationContext
- .<TestStreamIndex<T>>createRootBundle()
- .add(WindowedValue.valueInGlobalWindow(TestStreamIndex.of(transform.getTransform())))
- .commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
- return Collections.<CommittedBundle<?>>singleton(initialBundle);
- }
-
@Nullable
@Override
public <InputT> TransformEvaluator<InputT> forApplication(
@@ -206,6 +191,28 @@ class TestStreamEvaluatorFactory implements RootTransformEvaluatorFactory {
}
}
+ static class InputProvider implements RootInputProvider {
+ private final EvaluationContext evaluationContext;
+
+ InputProvider(EvaluationContext evaluationContext) {
+ this.evaluationContext = evaluationContext;
+ }
+
+ @Override
+ public Collection<CommittedBundle<?>> getInitialInputs(AppliedPTransform<?, ?, ?> transform) {
+ return createInputBundle((AppliedPTransform) transform);
+ }
+
+ private <T> Collection<CommittedBundle<?>> createInputBundle(
+ AppliedPTransform<?, ?, TestStream<T>> transform) {
+ CommittedBundle<TestStreamIndex<T>> initialBundle =
+ evaluationContext
+ .<TestStreamIndex<T>>createRootBundle()
+ .add(WindowedValue.valueInGlobalWindow(TestStreamIndex.of(transform.getTransform())))
+ .commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
+ return Collections.<CommittedBundle<?>>singleton(initialBundle);
+ }
+ }
@AutoValue
abstract static class TestStreamIndex<T> {
static <T> TestStreamIndex<T> of(TestStream<T> stream) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
index 3332c2a..4b495e6 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.runners.direct;
-import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.collect.ImmutableMap;
@@ -42,7 +41,7 @@ import org.slf4j.LoggerFactory;
* A {@link TransformEvaluatorFactory} that delegates to primitive {@link TransformEvaluatorFactory}
* implementations based on the type of {@link PTransform} of the application.
*/
-class TransformEvaluatorRegistry implements RootTransformEvaluatorFactory {
+class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
private static final Logger LOG = LoggerFactory.getLogger(TransformEvaluatorRegistry.class);
public static TransformEvaluatorRegistry defaultRegistry(EvaluationContext ctxt) {
@SuppressWarnings("rawtypes")
@@ -77,20 +76,6 @@ class TransformEvaluatorRegistry implements RootTransformEvaluatorFactory {
}
@Override
- public Collection<CommittedBundle<?>> getInitialInputs(AppliedPTransform<?, ?, ?> transform) {
- checkState(
- !finished.get(), "Tried to get initial inputs for a finished TransformEvaluatorRegistry");
- TransformEvaluatorFactory factory = getFactory(transform);
- checkArgument(
- factory instanceof RootTransformEvaluatorFactory,
- "Tried to get Initial Inputs for Transform %s. %s does not have an associated %s",
- transform.getFullName(),
- transform.getTransform().getClass().getSimpleName(),
- RootTransformEvaluatorFactory.class.getSimpleName());
- return ((RootTransformEvaluatorFactory) factory).getInitialInputs(transform);
- }
-
- @Override
public <InputT> TransformEvaluator<InputT> forApplication(
AppliedPTransform<?, ?, ?> application, CommittedBundle<?> inputBundle)
throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
index 1a89695..08dc286 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
@@ -22,8 +22,6 @@ import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
@@ -46,12 +44,11 @@ import org.joda.time.Instant;
* A {@link TransformEvaluatorFactory} that produces {@link TransformEvaluator TransformEvaluators}
* for the {@link Unbounded Read.Unbounded} primitive {@link PTransform}.
*/
-class UnboundedReadEvaluatorFactory implements RootTransformEvaluatorFactory {
+class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
// Occasionally close an existing reader and resume from checkpoint, to exercise close-and-resume
@VisibleForTesting static final double DEFAULT_READER_REUSE_CHANCE = 0.95;
private final EvaluationContext evaluationContext;
- private final ConcurrentMap<AppliedPTransform<?, ?, ?>, UnboundedReadDeduplicator> deduplicators;
private final double readerReuseChance;
UnboundedReadEvaluatorFactory(EvaluationContext evaluationContext) {
@@ -61,31 +58,9 @@ class UnboundedReadEvaluatorFactory implements RootTransformEvaluatorFactory {
@VisibleForTesting
UnboundedReadEvaluatorFactory(EvaluationContext evaluationContext, double readerReuseChance) {
this.evaluationContext = evaluationContext;
- deduplicators = new ConcurrentHashMap<>();
this.readerReuseChance = readerReuseChance;
}
- @Override
- public Collection<CommittedBundle<?>> getInitialInputs(AppliedPTransform<?, ?, ?> transform) {
- return createInitialSplits((AppliedPTransform) transform);
- }
-
- private <OutputT> Collection<CommittedBundle<?>> createInitialSplits(
- AppliedPTransform<?, ?, Read.Unbounded<OutputT>> transform) {
- UnboundedSource<OutputT, ?> source = transform.getTransform().getSource();
- UnboundedReadDeduplicator deduplicator =
- source.requiresDeduping()
- ? UnboundedReadDeduplicator.CachedIdDeduplicator.create()
- : NeverDeduplicator.create();
-
- UnboundedSourceShard<OutputT, ?> shard = UnboundedSourceShard.unstarted(source, deduplicator);
- return Collections.<CommittedBundle<?>>singleton(
- evaluationContext
- .<UnboundedSourceShard<?, ?>>createRootBundle()
- .add(WindowedValue.<UnboundedSourceShard<?, ?>>valueInGlobalWindow(shard))
- .commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
- }
-
@SuppressWarnings({"unchecked", "rawtypes"})
@Override
@Nullable
@@ -269,4 +244,33 @@ class UnboundedReadEvaluatorFactory implements RootTransformEvaluatorFactory {
return of(getSource(), getDeduplicator(), getExistingReader(), newCheckpoint);
}
}
+
+ static class InputProvider implements RootInputProvider {
+ private final EvaluationContext evaluationContext;
+
+ InputProvider(EvaluationContext evaluationContext) {
+ this.evaluationContext = evaluationContext;
+ }
+
+ @Override
+ public Collection<CommittedBundle<?>> getInitialInputs(AppliedPTransform<?, ?, ?> transform) {
+ return createInitialSplits((AppliedPTransform) transform);
+ }
+
+ private <OutputT> Collection<CommittedBundle<?>> createInitialSplits(
+ AppliedPTransform<?, ?, Read.Unbounded<OutputT>> transform) {
+ UnboundedSource<OutputT, ?> source = transform.getTransform().getSource();
+ UnboundedReadDeduplicator deduplicator =
+ source.requiresDeduping()
+ ? UnboundedReadDeduplicator.CachedIdDeduplicator.create()
+ : NeverDeduplicator.create();
+
+ UnboundedSourceShard<OutputT, ?> shard = UnboundedSourceShard.unstarted(source, deduplicator);
+ return Collections.<CommittedBundle<?>>singleton(
+ evaluationContext
+ .<UnboundedSourceShard<?, ?>>createRootBundle()
+ .add(WindowedValue.<UnboundedSourceShard<?, ?>>valueInGlobalWindow(shard))
+ .commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
index 8544128..ee17eae 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
@@ -86,7 +86,8 @@ public class BoundedReadEvaluatorFactoryTest {
when(context.createBundle(longs)).thenReturn(outputBundle);
Collection<CommittedBundle<?>> initialInputs =
- factory.getInitialInputs(longs.getProducingTransformInternal());
+ new BoundedReadEvaluatorFactory.InputProvider(context)
+ .getInitialInputs(longs.getProducingTransformInternal());
List<WindowedValue<?>> outputs = new ArrayList<>();
for (CommittedBundle<?> shardBundle : initialInputs) {
TransformEvaluator<?> evaluator =
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
index 86d98e9..aa7b178 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
@@ -128,7 +128,8 @@ public class FlattenEvaluatorFactoryTest {
FlattenEvaluatorFactory factory = new FlattenEvaluatorFactory(evaluationContext);
Collection<CommittedBundle<?>> initialInputs =
- factory.getInitialInputs(flattened.getProducingTransformInternal());
+ new EmptyInputProvider(evaluationContext)
+ .getInitialInputs(flattened.getProducingTransformInternal());
TransformEvaluator<Integer> emptyEvaluator =
factory.forApplication(
flattened.getProducingTransformInternal(), Iterables.getOnlyElement(initialInputs));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/test/java/org/apache/beam/runners/direct/LockedKeyedResourcePoolTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/LockedKeyedResourcePoolTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/LockedKeyedResourcePoolTest.java
deleted file mode 100644
index e1e24a3..0000000
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/LockedKeyedResourcePoolTest.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.direct;
-
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
-
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.ExecutionError;
-import com.google.common.util.concurrent.UncheckedExecutionException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link LockedKeyedResourcePool}.
- */
-@RunWith(JUnit4.class)
-public class LockedKeyedResourcePoolTest {
- @Rule public ExpectedException thrown = ExpectedException.none();
- private LockedKeyedResourcePool<String, Integer> cache =
- LockedKeyedResourcePool.create();
-
- @Test
- public void acquireReleaseAcquireLastLoadedOrReleased() throws ExecutionException {
- Optional<Integer> returned = cache.tryAcquire("foo", new Callable<Integer>() {
- @Override
- public Integer call() throws Exception {
- return 3;
- }
- });
- assertThat(returned.get(), equalTo(3));
-
- cache.release("foo", 4);
- Optional<Integer> reacquired = cache.tryAcquire("foo", new Callable<Integer>() {
- @Override
- public Integer call() throws Exception {
- return 5;
- }
- });
- assertThat(reacquired.get(), equalTo(4));
- }
-
- @Test
- public void acquireReleaseReleaseThrows() throws ExecutionException {
- Optional<Integer> returned = cache.tryAcquire("foo", new Callable<Integer>() {
- @Override
- public Integer call() throws Exception {
- return 3;
- }
- });
- assertThat(returned.get(), equalTo(3));
-
- cache.release("foo", 4);
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage("already a value present");
- thrown.expectMessage("At most one");
- cache.release("foo", 4);
- }
-
- @Test
- public void releaseBeforeAcquireThrows() {
- thrown.expect(NullPointerException.class);
- thrown.expectMessage("before a value was acquired");
- cache.release("bar", 3);
- }
-
- @Test
- public void multipleAcquireWithoutReleaseAbsent() throws ExecutionException {
- Optional<Integer> returned = cache.tryAcquire("foo", new Callable<Integer>() {
- @Override
- public Integer call() throws Exception {
- return 3;
- }
- });
- Optional<Integer> secondReturned = cache.tryAcquire("foo", new Callable<Integer>() {
- @Override
- public Integer call() throws Exception {
- return 3;
- }
- });
- assertThat(secondReturned.isPresent(), is(false));
- }
-
- @Test
- public void acquireMultipleKeysSucceeds() throws ExecutionException {
- Optional<Integer> returned = cache.tryAcquire("foo", new Callable<Integer>() {
- @Override
- public Integer call() throws Exception {
- return 3;
- }
- });
- Optional<Integer> secondReturned = cache.tryAcquire("bar", new Callable<Integer>() {
- @Override
- public Integer call() throws Exception {
- return 4;
- }
- });
-
- assertThat(returned.get(), equalTo(3));
- assertThat(secondReturned.get(), equalTo(4));
- }
-
- @Test
- public void acquireThrowsExceptionWrapped() throws ExecutionException {
- final Exception cause = new Exception("checkedException");
- thrown.expect(ExecutionException.class);
- thrown.expectCause(equalTo(cause));
- cache.tryAcquire("foo", new Callable<Integer>() {
- @Override
- public Integer call() throws Exception {
- throw cause;
- }
- });
- }
-
- @Test
- public void acquireThrowsRuntimeExceptionWrapped() throws ExecutionException {
- final RuntimeException cause = new RuntimeException("UncheckedException");
- thrown.expect(UncheckedExecutionException.class);
- thrown.expectCause(equalTo(cause));
- cache.tryAcquire("foo", new Callable<Integer>() {
- @Override
- public Integer call() throws Exception {
- throw cause;
- }
- });
- }
-
- @Test
- public void acquireThrowsErrorWrapped() throws ExecutionException {
- final Error cause = new Error("Error");
- thrown.expect(ExecutionError.class);
- thrown.expectCause(equalTo(cause));
- cache.tryAcquire("foo", new Callable<Integer>() {
- @Override
- public Integer call() throws Exception {
- throw cause;
- }
- });
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
index 1790b2d..60b9c79 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
@@ -81,7 +81,8 @@ public class TestStreamEvaluatorFactoryTest {
.thenReturn(bundleFactory.createBundle(streamVals), bundleFactory.createBundle(streamVals));
Collection<CommittedBundle<?>> initialInputs =
- factory.getInitialInputs(streamVals.getProducingTransformInternal());
+ new TestStreamEvaluatorFactory.InputProvider(context)
+ .getInitialInputs(streamVals.getProducingTransformInternal());
@SuppressWarnings("unchecked")
CommittedBundle<TestStreamIndex<Integer>> initialBundle =
(CommittedBundle<TestStreamIndex<Integer>>) Iterables.getOnlyElement(initialInputs);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
index 25642dd..b78fbeb 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
@@ -97,7 +97,8 @@ public class UnboundedReadEvaluatorFactoryTest {
when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
Collection<CommittedBundle<?>> initialInputs =
- factory.getInitialInputs(longs.getProducingTransformInternal());
+ new UnboundedReadEvaluatorFactory.InputProvider(context)
+ .getInitialInputs(longs.getProducingTransformInternal());
CommittedBundle<?> inputShards = Iterables.getOnlyElement(initialInputs);
UnboundedSourceShard<Long, ?> inputShard =
@@ -141,7 +142,8 @@ public class UnboundedReadEvaluatorFactoryTest {
AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal();
when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
- Collection<CommittedBundle<?>> initialInputs = factory.getInitialInputs(sourceTransform);
+ Collection<CommittedBundle<?>> initialInputs =
+ new UnboundedReadEvaluatorFactory.InputProvider(context).getInitialInputs(sourceTransform);
UncommittedBundle<Long> output = bundleFactory.createBundle(pcollection);
when(context.createBundle(pcollection)).thenReturn(output);
@@ -196,7 +198,6 @@ public class UnboundedReadEvaluatorFactoryTest {
.commit(Instant.now());
UnboundedReadEvaluatorFactory factory =
new UnboundedReadEvaluatorFactory(context, 1.0 /* Always reuse */);
- factory.getInitialInputs(pcollection.getProducingTransformInternal());
TransformEvaluator<UnboundedSourceShard<Long, TestCheckpointMark>> evaluator =
factory.forApplication(sourceTransform, inputBundle);
evaluator.processElement(shard);
@@ -239,7 +240,6 @@ public class UnboundedReadEvaluatorFactoryTest {
.commit(Instant.now());
UnboundedReadEvaluatorFactory factory =
new UnboundedReadEvaluatorFactory(context, 0.0 /* never reuse */);
- factory.getInitialInputs(pcollection.getProducingTransformInternal());
TransformEvaluator<UnboundedSourceShard<Long, TestCheckpointMark>> evaluator =
factory.forApplication(sourceTransform, inputBundle);
evaluator.processElement(shard);