You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/06/16 02:30:44 UTC
[09/12] incubator-beam git commit: Remove InProcess Prefixes
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4649eebe/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutor.java
deleted file mode 100644
index 1cfa544..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutor.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
-
-import java.util.Collection;
-
-/**
- * An executor that schedules and executes {@link AppliedPTransform AppliedPTransforms} for both
- * source and intermediate {@link PTransform PTransforms}.
- */
-interface InProcessExecutor {
- /**
- * Starts this executor. The provided collection is the collection of root transforms to
- * initially schedule.
- *
- * @param rootTransforms
- */
- void start(Collection<AppliedPTransform<?, ?, ?>> rootTransforms);
-
- /**
- * Blocks until the job being executed enters a terminal state. A job is completed after all
- * root {@link AppliedPTransform AppliedPTransforms} have completed, and all
- * {@link CommittedBundle Bundles} have been consumed. Jobs may also terminate abnormally.
- *
- * @throws Throwable whenever an executor thread throws anything, transfers the throwable to the
- * waiting thread and rethrows it
- */
- void awaitCompletion() throws Throwable;
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4649eebe/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupAlsoByWindowEvaluatorFactory.java
deleted file mode 100644
index 53b93d0..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupAlsoByWindowEvaluatorFactory.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
-import org.apache.beam.runners.direct.InProcessGroupByKey.InProcessGroupAlsoByWindow;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly;
-import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
-import org.apache.beam.sdk.util.KeyedWorkItem;
-import org.apache.beam.sdk.util.SystemReduceFn;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-
-import com.google.common.collect.ImmutableMap;
-
-import java.util.Collections;
-
-/**
- * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the
- * {@link GroupByKeyOnly} {@link PTransform}.
- */
-class InProcessGroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
- @Override
- public <InputT> TransformEvaluator<InputT> forApplication(
- AppliedPTransform<?, ?, ?> application,
- CommittedBundle<?> inputBundle,
- InProcessEvaluationContext evaluationContext) {
- @SuppressWarnings({"cast", "unchecked", "rawtypes"})
- TransformEvaluator<InputT> evaluator =
- createEvaluator(
- (AppliedPTransform) application, (CommittedBundle) inputBundle, evaluationContext);
- return evaluator;
- }
-
- private <K, V> TransformEvaluator<KeyedWorkItem<K, V>> createEvaluator(
- AppliedPTransform<
- PCollection<KeyedWorkItem<K, V>>, PCollection<KV<K, Iterable<V>>>,
- InProcessGroupAlsoByWindow<K, V>>
- application,
- CommittedBundle<KeyedWorkItem<K, V>> inputBundle,
- InProcessEvaluationContext evaluationContext) {
- return new InProcessGroupAlsoByWindowEvaluator<K, V>(
- evaluationContext, inputBundle, application);
- }
-
- /**
- * A transform evaluator for the pseudo-primitive {@link GroupAlsoByWindow}. Windowing is ignored;
- * all input should be in the global window since all output will be as well.
- *
- * @see GroupByKeyViaGroupByKeyOnly
- */
- private static class InProcessGroupAlsoByWindowEvaluator<K, V>
- implements TransformEvaluator<KeyedWorkItem<K, V>> {
-
- private final TransformEvaluator<KeyedWorkItem<K, V>> gabwParDoEvaluator;
-
- public InProcessGroupAlsoByWindowEvaluator(
- final InProcessEvaluationContext evaluationContext,
- CommittedBundle<KeyedWorkItem<K, V>> inputBundle,
- final AppliedPTransform<
- PCollection<KeyedWorkItem<K, V>>, PCollection<KV<K, Iterable<V>>>,
- InProcessGroupAlsoByWindow<K, V>>
- application) {
-
- Coder<V> valueCoder =
- application.getTransform().getValueCoder(inputBundle.getPCollection().getCoder());
-
- @SuppressWarnings("unchecked")
- WindowingStrategy<?, BoundedWindow> windowingStrategy =
- (WindowingStrategy<?, BoundedWindow>) application.getTransform().getWindowingStrategy();
-
- DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> gabwDoFn =
- GroupAlsoByWindowViaWindowSetDoFn.create(
- windowingStrategy,
- SystemReduceFn.<K, V, BoundedWindow>buffering(valueCoder));
-
- TupleTag<KV<K, Iterable<V>>> mainOutputTag = new TupleTag<KV<K, Iterable<V>>>() {};
-
- // Not technically legit, as the application is not a ParDo
- this.gabwParDoEvaluator =
- ParDoInProcessEvaluator.create(
- evaluationContext,
- inputBundle,
- application,
- gabwDoFn,
- Collections.<PCollectionView<?>>emptyList(),
- mainOutputTag,
- Collections.<TupleTag<?>>emptyList(),
- ImmutableMap.<TupleTag<?>, PCollection<?>>of(mainOutputTag, application.getOutput()));
- }
-
- @Override
- public void processElement(WindowedValue<KeyedWorkItem<K, V>> element) throws Exception {
- gabwParDoEvaluator.processElement(element);
- }
-
- @Override
- public InProcessTransformResult finishBundle() throws Exception {
- return gabwParDoEvaluator.finishBundle();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4649eebe/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKey.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKey.java
deleted file mode 100644
index 026b4d5..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKey.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.direct;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.IterableCoder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.ReifyTimestampsAndWindows;
-import org.apache.beam.sdk.util.KeyedWorkItem;
-import org.apache.beam.sdk.util.KeyedWorkItemCoder;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-
-class InProcessGroupByKey<K, V>
- extends ForwardingPTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {
- private final GroupByKey<K, V> original;
-
- InProcessGroupByKey(GroupByKey<K, V> from) {
- this.original = from;
- }
-
- @Override
- public PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> delegate() {
- return original;
- }
-
- @Override
- public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
- @SuppressWarnings("unchecked")
- KvCoder<K, V> inputCoder = (KvCoder<K, V>) input.getCoder();
-
- // This operation groups by the combination of key and window,
- // merging windows as needed, using the windows assigned to the
- // key/value input elements and the window merge operation of the
- // window function associated with the input PCollection.
- WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
-
- // By default, implement GroupByKey via a series of lower-level operations.
- return input
- // Make each input element's timestamp and assigned windows
- // explicit, in the value part.
- .apply(new ReifyTimestampsAndWindows<K, V>())
- .apply(new InProcessGroupByKeyOnly<K, V>())
- .setCoder(
- KeyedWorkItemCoder.of(
- inputCoder.getKeyCoder(),
- inputCoder.getValueCoder(),
- input.getWindowingStrategy().getWindowFn().windowCoder()))
-
- // Group each key's values by window, merging windows as needed.
- .apply("GroupAlsoByWindow", new InProcessGroupAlsoByWindow<K, V>(windowingStrategy))
-
- // And update the windowing strategy as appropriate.
- .setWindowingStrategyInternal(original.updateWindowingStrategy(windowingStrategy))
- .setCoder(
- KvCoder.of(inputCoder.getKeyCoder(), IterableCoder.of(inputCoder.getValueCoder())));
- }
-
- static final class InProcessGroupByKeyOnly<K, V>
- extends PTransform<PCollection<KV<K, WindowedValue<V>>>, PCollection<KeyedWorkItem<K, V>>> {
- @Override
- public PCollection<KeyedWorkItem<K, V>> apply(PCollection<KV<K, WindowedValue<V>>> input) {
- return PCollection.<KeyedWorkItem<K, V>>createPrimitiveOutputInternal(
- input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
- }
-
- InProcessGroupByKeyOnly() {}
- }
-
- static final class InProcessGroupAlsoByWindow<K, V>
- extends PTransform<PCollection<KeyedWorkItem<K, V>>, PCollection<KV<K, Iterable<V>>>> {
-
- private final WindowingStrategy<?, ?> windowingStrategy;
-
- public InProcessGroupAlsoByWindow(WindowingStrategy<?, ?> windowingStrategy) {
- this.windowingStrategy = windowingStrategy;
- }
-
- public WindowingStrategy<?, ?> getWindowingStrategy() {
- return windowingStrategy;
- }
-
- private KeyedWorkItemCoder<K, V> getKeyedWorkItemCoder(Coder<KeyedWorkItem<K, V>> inputCoder) {
- // Coder<KV<...>> --> KvCoder<...>
- checkArgument(
- inputCoder instanceof KeyedWorkItemCoder,
- "%s requires a %s<...> but got %s",
- getClass().getSimpleName(),
- KvCoder.class.getSimpleName(),
- inputCoder);
- @SuppressWarnings("unchecked")
- KeyedWorkItemCoder<K, V> kvCoder = (KeyedWorkItemCoder<K, V>) inputCoder;
- return kvCoder;
- }
-
- public Coder<K> getKeyCoder(Coder<KeyedWorkItem<K, V>> inputCoder) {
- return getKeyedWorkItemCoder(inputCoder).getKeyCoder();
- }
-
- public Coder<V> getValueCoder(Coder<KeyedWorkItem<K, V>> inputCoder) {
- return getKeyedWorkItemCoder(inputCoder).getElementCoder();
- }
-
- @Override
- public PCollection<KV<K, Iterable<V>>> apply(PCollection<KeyedWorkItem<K, V>> input) {
- return PCollection.<KV<K, Iterable<V>>>createPrimitiveOutputInternal(
- input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4649eebe/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactory.java
deleted file mode 100644
index 3604bca..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactory.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import org.apache.beam.runners.direct.InProcessGroupByKey.InProcessGroupByKeyOnly;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
-import org.apache.beam.runners.direct.StepTransformResult.Builder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly;
-import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
-import org.apache.beam.sdk.util.KeyedWorkItem;
-import org.apache.beam.sdk.util.KeyedWorkItems;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the
- * {@link GroupByKeyOnly} {@link PTransform}.
- */
-class InProcessGroupByKeyOnlyEvaluatorFactory implements TransformEvaluatorFactory {
- @Override
- public <InputT> TransformEvaluator<InputT> forApplication(
- AppliedPTransform<?, ?, ?> application,
- CommittedBundle<?> inputBundle,
- InProcessEvaluationContext evaluationContext) {
- @SuppressWarnings({"cast", "unchecked", "rawtypes"})
- TransformEvaluator<InputT> evaluator =
- createEvaluator(
- (AppliedPTransform) application, (CommittedBundle) inputBundle, evaluationContext);
- return evaluator;
- }
-
- private <K, V> TransformEvaluator<KV<K, WindowedValue<V>>> createEvaluator(
- final AppliedPTransform<
- PCollection<KV<K, WindowedValue<V>>>, PCollection<KeyedWorkItem<K, V>>,
- InProcessGroupByKeyOnly<K, V>>
- application,
- final CommittedBundle<KV<K, WindowedValue<V>>> inputBundle,
- final InProcessEvaluationContext evaluationContext) {
- return new InProcessGroupByKeyOnlyEvaluator<K, V>(evaluationContext, inputBundle, application);
- }
-
- /**
- * A transform evaluator for the pseudo-primitive {@link GroupByKeyOnly}. Windowing is ignored;
- * all input should be in the global window since all output will be as well.
- *
- * @see GroupByKeyViaGroupByKeyOnly
- */
- private static class InProcessGroupByKeyOnlyEvaluator<K, V>
- implements TransformEvaluator<KV<K, WindowedValue<V>>> {
- private final InProcessEvaluationContext evaluationContext;
-
- private final CommittedBundle<KV<K, WindowedValue<V>>> inputBundle;
- private final AppliedPTransform<
- PCollection<KV<K, WindowedValue<V>>>, PCollection<KeyedWorkItem<K, V>>,
- InProcessGroupByKeyOnly<K, V>>
- application;
- private final Coder<K> keyCoder;
- private Map<GroupingKey<K>, List<WindowedValue<V>>> groupingMap;
-
- public InProcessGroupByKeyOnlyEvaluator(
- InProcessEvaluationContext evaluationContext,
- CommittedBundle<KV<K, WindowedValue<V>>> inputBundle,
- AppliedPTransform<
- PCollection<KV<K, WindowedValue<V>>>, PCollection<KeyedWorkItem<K, V>>,
- InProcessGroupByKeyOnly<K, V>>
- application) {
- this.evaluationContext = evaluationContext;
- this.inputBundle = inputBundle;
- this.application = application;
- this.keyCoder = getKeyCoder(application.getInput().getCoder());
- this.groupingMap = new HashMap<>();
- }
-
- private Coder<K> getKeyCoder(Coder<KV<K, WindowedValue<V>>> coder) {
- checkState(
- coder instanceof KvCoder,
- "%s requires a coder of class %s."
- + " This is an internal error; this is checked during pipeline construction"
- + " but became corrupted.",
- getClass().getSimpleName(),
- KvCoder.class.getSimpleName());
- @SuppressWarnings("unchecked")
- Coder<K> keyCoder = ((KvCoder<K, WindowedValue<V>>) coder).getKeyCoder();
- return keyCoder;
- }
-
- @Override
- public void processElement(WindowedValue<KV<K, WindowedValue<V>>> element) {
- KV<K, WindowedValue<V>> kv = element.getValue();
- K key = kv.getKey();
- byte[] encodedKey;
- try {
- encodedKey = encodeToByteArray(keyCoder, key);
- } catch (CoderException exn) {
- // TODO: Put in better element printing:
- // truncate if too long.
- throw new IllegalArgumentException(
- String.format("unable to encode key %s of input to %s using %s", key, this, keyCoder),
- exn);
- }
- GroupingKey<K> groupingKey = new GroupingKey<>(key, encodedKey);
- List<WindowedValue<V>> values = groupingMap.get(groupingKey);
- if (values == null) {
- values = new ArrayList<WindowedValue<V>>();
- groupingMap.put(groupingKey, values);
- }
- values.add(kv.getValue());
- }
-
- @Override
- public InProcessTransformResult finishBundle() {
- Builder resultBuilder = StepTransformResult.withoutHold(application);
- for (Map.Entry<GroupingKey<K>, List<WindowedValue<V>>> groupedEntry :
- groupingMap.entrySet()) {
- K key = groupedEntry.getKey().key;
- KeyedWorkItem<K, V> groupedKv =
- KeyedWorkItems.elementsWorkItem(key, groupedEntry.getValue());
- UncommittedBundle<KeyedWorkItem<K, V>> bundle = evaluationContext.createKeyedBundle(
- inputBundle,
- StructuralKey.of(key, keyCoder),
- application.getOutput());
- bundle.add(WindowedValue.valueInGlobalWindow(groupedKv));
- resultBuilder.addOutput(bundle);
- }
- return resultBuilder.build();
- }
-
- private static class GroupingKey<K> {
- private K key;
- private byte[] encodedKey;
-
- public GroupingKey(K key, byte[] encodedKey) {
- this.key = key;
- this.encodedKey = encodedKey;
- }
-
- @Override
- public boolean equals(Object o) {
- if (o instanceof GroupingKey) {
- GroupingKey<?> that = (GroupingKey<?>) o;
- return Arrays.equals(this.encodedKey, that.encodedKey);
- } else {
- return false;
- }
- }
-
- @Override
- public int hashCode() {
- return Arrays.hashCode(encodedKey);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4649eebe/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOverrideFactory.java
deleted file mode 100644
index 1d84bc9..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOverrideFactory.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-
-/**
- * A {@link PTransformOverrideFactory} for {@link GroupByKey} PTransforms.
- */
-final class InProcessGroupByKeyOverrideFactory
- implements PTransformOverrideFactory {
- @Override
- public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override(
- PTransform<InputT, OutputT> transform) {
- if (transform instanceof GroupByKey) {
- @SuppressWarnings({"rawtypes", "unchecked"})
- PTransform<InputT, OutputT> override =
- (PTransform) new InProcessGroupByKey((GroupByKey) transform);
- return override;
- }
- return transform;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4649eebe/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java
deleted file mode 100644
index f53f590..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.PCollectionViewWindow;
-import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
-import org.apache.beam.sdk.util.SideInputReader;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.PCollectionView;
-
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Optional;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Sets;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.annotation.Nullable;
-
-/**
- * An in-process container for {@link PCollectionView PCollectionViews}, which provides methods for
- * constructing {@link SideInputReader SideInputReaders} which block until a side input is
- * available and writing to a {@link PCollectionView}.
- */
-class InProcessSideInputContainer {
- private final Collection<PCollectionView<?>> containedViews;
- private final LoadingCache<
- PCollectionViewWindow<?>, AtomicReference<Iterable<? extends WindowedValue<?>>>>
- viewByWindows;
-
- /**
- * Create a new {@link InProcessSideInputContainer} with the provided views and the provided
- * context.
- */
- public static InProcessSideInputContainer create(
- final InProcessEvaluationContext context, Collection<PCollectionView<?>> containedViews) {
- LoadingCache<PCollectionViewWindow<?>, AtomicReference<Iterable<? extends WindowedValue<?>>>>
- viewByWindows = CacheBuilder.newBuilder().build(new CallbackSchedulingLoader(context));
- return new InProcessSideInputContainer(containedViews, viewByWindows);
- }
-
- private InProcessSideInputContainer(
- Collection<PCollectionView<?>> containedViews,
- LoadingCache<PCollectionViewWindow<?>, AtomicReference<Iterable<? extends WindowedValue<?>>>>
- viewByWindows) {
- this.containedViews = ImmutableSet.copyOf(containedViews);
- this.viewByWindows = viewByWindows;
- }
-
- /**
- * Return a view of this {@link InProcessSideInputContainer} that contains only the views in the
- * provided argument. The returned {@link InProcessSideInputContainer} is unmodifiable without
- * casting, but will change as this {@link InProcessSideInputContainer} is modified.
- */
- public ReadyCheckingSideInputReader createReaderForViews(
- Collection<PCollectionView<?>> newContainedViews) {
- if (!containedViews.containsAll(newContainedViews)) {
- Set<PCollectionView<?>> currentlyContained = ImmutableSet.copyOf(containedViews);
- Set<PCollectionView<?>> newRequested = ImmutableSet.copyOf(newContainedViews);
- throw new IllegalArgumentException("Can't create a SideInputReader with unknown views "
- + Sets.difference(newRequested, currentlyContained));
- }
- return new SideInputContainerSideInputReader(newContainedViews);
- }
-
- /**
- * Write the provided values to the provided view.
- *
- * <p>The windowed values are first exploded, then for each window the pane is determined. For
- * each window, if the pane is later than the current pane stored within this container, write
- * all of the values to the container as the new values of the {@link PCollectionView}.
- *
- * <p>The provided iterable is expected to contain only a single window and pane.
- */
- public void write(PCollectionView<?> view, Iterable<? extends WindowedValue<?>> values) {
- Map<BoundedWindow, Collection<WindowedValue<?>>> valuesPerWindow =
- indexValuesByWindow(values);
- for (Map.Entry<BoundedWindow, Collection<WindowedValue<?>>> windowValues :
- valuesPerWindow.entrySet()) {
- updatePCollectionViewWindowValues(view, windowValues.getKey(), windowValues.getValue());
- }
- }
-
- /**
- * Index the provided values by all {@link BoundedWindow windows} in which they appear.
- */
- private Map<BoundedWindow, Collection<WindowedValue<?>>> indexValuesByWindow(
- Iterable<? extends WindowedValue<?>> values) {
- Map<BoundedWindow, Collection<WindowedValue<?>>> valuesPerWindow = new HashMap<>();
- for (WindowedValue<?> value : values) {
- for (BoundedWindow window : value.getWindows()) {
- Collection<WindowedValue<?>> windowValues = valuesPerWindow.get(window);
- if (windowValues == null) {
- windowValues = new ArrayList<>();
- valuesPerWindow.put(window, windowValues);
- }
- windowValues.add(value);
- }
- }
- return valuesPerWindow;
- }
-
- /**
- * Set the value of the {@link PCollectionView} in the {@link BoundedWindow} to be based on the
- * specified values, if the values are part of a later pane than currently exist within the
- * {@link PCollectionViewWindow}.
- */
- private void updatePCollectionViewWindowValues(
- PCollectionView<?> view, BoundedWindow window, Collection<WindowedValue<?>> windowValues) {
- PCollectionViewWindow<?> windowedView = PCollectionViewWindow.of(view, window);
- AtomicReference<Iterable<? extends WindowedValue<?>>> contents =
- viewByWindows.getUnchecked(windowedView);
- if (contents.compareAndSet(null, windowValues)) {
- // the value had never been set, so we set it and are done.
- return;
- }
- PaneInfo newPane = windowValues.iterator().next().getPane();
-
- Iterable<? extends WindowedValue<?>> existingValues;
- long existingPane;
- do {
- existingValues = contents.get();
- existingPane =
- Iterables.isEmpty(existingValues)
- ? -1L
- : existingValues.iterator().next().getPane().getIndex();
- } while (newPane.getIndex() > existingPane
- && !contents.compareAndSet(existingValues, windowValues));
- }
-
- private static class CallbackSchedulingLoader extends
- CacheLoader<PCollectionViewWindow<?>, AtomicReference<Iterable<? extends WindowedValue<?>>>> {
- private final InProcessEvaluationContext context;
-
- public CallbackSchedulingLoader(
- InProcessEvaluationContext context) {
- this.context = context;
- }
-
- @Override
- public AtomicReference<Iterable<? extends WindowedValue<?>>>
- load(PCollectionViewWindow<?> view) {
-
- AtomicReference<Iterable<? extends WindowedValue<?>>> contents = new AtomicReference<>();
- WindowingStrategy<?, ?> windowingStrategy = view.getView().getWindowingStrategyInternal();
-
- context.scheduleAfterOutputWouldBeProduced(view.getView(),
- view.getWindow(),
- windowingStrategy,
- new WriteEmptyViewContents(view.getView(), view.getWindow(), contents));
- return contents;
- }
- }
-
- private static class WriteEmptyViewContents implements Runnable {
- private final PCollectionView<?> view;
- private final BoundedWindow window;
- private final AtomicReference<Iterable<? extends WindowedValue<?>>> contents;
-
- private WriteEmptyViewContents(PCollectionView<?> view, BoundedWindow window,
- AtomicReference<Iterable<? extends WindowedValue<?>>> contents) {
- this.contents = contents;
- this.view = view;
- this.window = window;
- }
-
- @Override
- public void run() {
- // The requested window has closed without producing elements, so reflect that in
- // the PCollectionView. If set has already been called, will do nothing.
- contents.compareAndSet(null, Collections.<WindowedValue<?>>emptyList());
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("view", view)
- .add("window", window)
- .toString();
- }
- }
-
- private final class SideInputContainerSideInputReader implements ReadyCheckingSideInputReader {
- private final Collection<PCollectionView<?>> readerViews;
- private final LoadingCache<
- PCollectionViewWindow<?>, Optional<? extends Iterable<? extends WindowedValue<?>>>>
- viewContents;
-
- private SideInputContainerSideInputReader(Collection<PCollectionView<?>> readerViews) {
- this.readerViews = ImmutableSet.copyOf(readerViews);
- this.viewContents = CacheBuilder.newBuilder().build(new CurrentViewContentsLoader());
- }
-
- @Override
- public boolean isReady(final PCollectionView<?> view, final BoundedWindow window) {
- checkArgument(
- readerViews.contains(view),
- "Tried to check if view %s was ready in a SideInputReader that does not contain it. "
- + "Contained views; %s",
- view,
- readerViews);
- return viewContents.getUnchecked(PCollectionViewWindow.of(view, window)).isPresent();
- }
-
- @Override
- @Nullable
- public <T> T get(final PCollectionView<T> view, final BoundedWindow window) {
- checkArgument(readerViews.contains(view),
- "call to get(PCollectionView) with unknown view: %s",
- view);
- checkArgument(
- isReady(view, window),
- "calling get() on PCollectionView %s that is not ready in window %s",
- view,
- window);
- // Safe covariant cast
- @SuppressWarnings("unchecked") Iterable<WindowedValue<?>> values =
- (Iterable<WindowedValue<?>>) viewContents.getUnchecked(PCollectionViewWindow.of(view,
- window)).get();
- return view.fromIterableInternal(values);
- }
-
- @Override
- public <T> boolean contains(PCollectionView<T> view) {
- return readerViews.contains(view);
- }
-
- @Override
- public boolean isEmpty() {
- return readerViews.isEmpty();
- }
- }
-
- /**
- * A {@link CacheLoader} that loads the current contents of a {@link PCollectionViewWindow} into
- * an optional.
- */
- private class CurrentViewContentsLoader extends CacheLoader<
- PCollectionViewWindow<?>, Optional<? extends Iterable<? extends WindowedValue<?>>>> {
-
- @Override
- public Optional<? extends Iterable<? extends WindowedValue<?>>>
- load(PCollectionViewWindow<?> key) {
- return Optional.fromNullable(viewByWindows.getUnchecked(key).get());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4649eebe/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTimerInternals.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTimerInternals.java
deleted file mode 100644
index cd54f59..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTimerInternals.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate;
-import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate.TimerUpdateBuilder;
-import org.apache.beam.runners.direct.InMemoryWatermarkManager.TransformWatermarks;
-import org.apache.beam.sdk.util.TimerInternals;
-
-import org.joda.time.Instant;
-
-import javax.annotation.Nullable;
-
-/**
- * An implementation of {@link TimerInternals} where all relevant data exists in memory.
- */
-public class InProcessTimerInternals implements TimerInternals {
- private final Clock processingTimeClock;
- private final TransformWatermarks watermarks;
- private final TimerUpdateBuilder timerUpdateBuilder;
-
- public static InProcessTimerInternals create(
- Clock clock, TransformWatermarks watermarks, TimerUpdateBuilder timerUpdateBuilder) {
- return new InProcessTimerInternals(clock, watermarks, timerUpdateBuilder);
- }
-
- private InProcessTimerInternals(
- Clock clock, TransformWatermarks watermarks, TimerUpdateBuilder timerUpdateBuilder) {
- this.processingTimeClock = clock;
- this.watermarks = watermarks;
- this.timerUpdateBuilder = timerUpdateBuilder;
- }
-
- @Override
- public void setTimer(TimerData timerKey) {
- timerUpdateBuilder.setTimer(timerKey);
- }
-
- @Override
- public void deleteTimer(TimerData timerKey) {
- timerUpdateBuilder.deletedTimer(timerKey);
- }
-
- public TimerUpdate getTimerUpdate() {
- return timerUpdateBuilder.build();
- }
-
- @Override
- public Instant currentProcessingTime() {
- return processingTimeClock.now();
- }
-
- @Override
- @Nullable
- public Instant currentSynchronizedProcessingTime() {
- return watermarks.getSynchronizedProcessingInputTime();
- }
-
- @Override
- public Instant currentInputWatermarkTime() {
- return watermarks.getInputWatermark();
- }
-
- @Override
- @Nullable
- public Instant currentOutputWatermarkTime() {
- return watermarks.getOutputWatermark();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4649eebe/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTransformResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTransformResult.java
deleted file mode 100644
index 92127b4..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTransformResult.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.common.CounterSet;
-import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
-
-import org.joda.time.Instant;
-
-import javax.annotation.Nullable;
-
-/**
- * The result of evaluating an {@link AppliedPTransform} with a {@link TransformEvaluator}.
- */
-public interface InProcessTransformResult {
- /**
- * Returns the {@link AppliedPTransform} that produced this result.
- */
- AppliedPTransform<?, ?, ?> getTransform();
-
- /**
- * Returns the {@link UncommittedBundle (uncommitted) Bundles} output by this transform. These
- * will be committed by the evaluation context as part of completing this result.
- */
- Iterable<? extends UncommittedBundle<?>> getOutputBundles();
-
- /**
- * Returns elements that were provided to the {@link TransformEvaluator} as input but were not
- * processed.
- */
- Iterable<? extends WindowedValue<?>> getUnprocessedElements();
-
- /**
- * Returns the {@link CounterSet} used by this {@link PTransform}, or null if this transform did
- * not use a {@link CounterSet}.
- */
- @Nullable CounterSet getCounters();
-
- /**
- * Returns the Watermark Hold for the transform at the time this result was produced.
- *
- * If the transform does not set any watermark hold, returns
- * {@link BoundedWindow#TIMESTAMP_MAX_VALUE}.
- */
- Instant getWatermarkHold();
-
- /**
- * Returns the State used by the transform.
- *
- * If this evaluation did not access state, this may return null.
- */
- @Nullable
- CopyOnAccessInMemoryStateInternals<?> getState();
-
- /**
- * Returns a TimerUpdateBuilder that was produced as a result of this evaluation. If the
- * evaluation was triggered due to the delivery of one or more timers, those timers must be added
- * to the builder before it is complete.
- *
- * <p>If this evaluation did not add or remove any timers, returns an empty TimerUpdate.
- */
- TimerUpdate getTimerUpdate();
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4649eebe/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java
index 758ee24..074619a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java
@@ -30,7 +30,7 @@ import org.apache.beam.sdk.values.PCollection;
* {@link ModelEnforcement} is provided with the input bundle as part of
* {@link ModelEnforcementFactory#forBundle(CommittedBundle, AppliedPTransform)}, each element
* before and after that element is provided to an underlying {@link TransformEvaluator}, and the
- * output {@link InProcessTransformResult} and committed output bundles after the
+ * output {@link TransformResult} and committed output bundles after the
* {@link TransformEvaluator} has completed.
*
* <p>Typically, {@link ModelEnforcement} will obtain required metadata (such as the {@link Coder}
@@ -53,11 +53,11 @@ public interface ModelEnforcement<T> {
/**
* Called after a bundle has been completed and {@link TransformEvaluator#finishBundle()} has been
- * called, producing the provided {@link InProcessTransformResult} and
+ * called, producing the provided {@link TransformResult} and
* {@link CommittedBundle output bundles}.
*/
void afterFinish(
CommittedBundle<T> input,
- InProcessTransformResult result,
+ TransformResult result,
Iterable<? extends CommittedBundle<?>> outputs);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4649eebe/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
new file mode 100644
index 0000000..58cee4d
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.util.DoFnRunner;
+import org.apache.beam.sdk.util.DoFnRunners;
+import org.apache.beam.sdk.util.DoFnRunners.OutputManager;
+import org.apache.beam.sdk.util.PushbackSideInputDoFnRunner;
+import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
+import org.apache.beam.sdk.util.UserCodeException;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.common.CounterSet;
+import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+class ParDoEvaluator<T> implements TransformEvaluator<T> {
+ public static <InputT, OutputT> ParDoEvaluator<InputT> create(
+ EvaluationContext evaluationContext,
+ CommittedBundle<InputT> inputBundle,
+ AppliedPTransform<PCollection<InputT>, ?, ?> application,
+ DoFn<InputT, OutputT> fn,
+ List<PCollectionView<?>> sideInputs,
+ TupleTag<OutputT> mainOutputTag,
+ List<TupleTag<?>> sideOutputTags,
+ Map<TupleTag<?>, PCollection<?>> outputs) {
+ DirectExecutionContext executionContext =
+ evaluationContext.getExecutionContext(application, inputBundle.getKey());
+ String stepName = evaluationContext.getStepName(application);
+ DirectStepContext stepContext =
+ executionContext.getOrCreateStepContext(stepName, stepName);
+
+ CounterSet counters = evaluationContext.createCounterSet();
+
+ Map<TupleTag<?>, UncommittedBundle<?>> outputBundles = new HashMap<>();
+ for (Map.Entry<TupleTag<?>, PCollection<?>> outputEntry : outputs.entrySet()) {
+ outputBundles.put(
+ outputEntry.getKey(),
+ evaluationContext.createBundle(inputBundle, outputEntry.getValue()));
+ }
+
+ ReadyCheckingSideInputReader sideInputReader =
+ evaluationContext.createSideInputReader(sideInputs);
+ DoFnRunner<InputT, OutputT> underlying =
+ DoFnRunners.createDefault(
+ evaluationContext.getPipelineOptions(),
+ fn,
+ sideInputReader,
+ BundleOutputManager.create(outputBundles),
+ mainOutputTag,
+ sideOutputTags,
+ stepContext,
+ counters.getAddCounterMutator(),
+ application.getInput().getWindowingStrategy());
+ PushbackSideInputDoFnRunner<InputT, OutputT> runner =
+ PushbackSideInputDoFnRunner.create(underlying, sideInputs, sideInputReader);
+
+ try {
+ runner.startBundle();
+ } catch (Exception e) {
+ throw UserCodeException.wrap(e);
+ }
+
+ return new ParDoEvaluator<>(
+ runner, application, counters, outputBundles.values(), stepContext);
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////
+
+ private final PushbackSideInputDoFnRunner<T, ?> fnRunner;
+ private final AppliedPTransform<PCollection<T>, ?, ?> transform;
+ private final CounterSet counters;
+ private final Collection<UncommittedBundle<?>> outputBundles;
+ private final DirectStepContext stepContext;
+
+ private final ImmutableList.Builder<WindowedValue<T>> unprocessedElements;
+
+ private ParDoEvaluator(
+ PushbackSideInputDoFnRunner<T, ?> fnRunner,
+ AppliedPTransform<PCollection<T>, ?, ?> transform,
+ CounterSet counters,
+ Collection<UncommittedBundle<?>> outputBundles,
+ DirectStepContext stepContext) {
+ this.fnRunner = fnRunner;
+ this.transform = transform;
+ this.counters = counters;
+ this.outputBundles = outputBundles;
+ this.stepContext = stepContext;
+
+ this.unprocessedElements = ImmutableList.builder();
+ }
+
+ @Override
+ public void processElement(WindowedValue<T> element) {
+ try {
+ Iterable<WindowedValue<T>> unprocessed = fnRunner.processElementInReadyWindows(element);
+ unprocessedElements.addAll(unprocessed);
+ } catch (Exception e) {
+ throw UserCodeException.wrap(e);
+ }
+ }
+
+ @Override
+ public TransformResult finishBundle() {
+ try {
+ fnRunner.finishBundle();
+ } catch (Exception e) {
+ throw UserCodeException.wrap(e);
+ }
+ StepTransformResult.Builder resultBuilder;
+ CopyOnAccessInMemoryStateInternals<?> state = stepContext.commitState();
+ if (state != null) {
+ resultBuilder =
+ StepTransformResult.withHold(transform, state.getEarliestWatermarkHold())
+ .withState(state);
+ } else {
+ resultBuilder = StepTransformResult.withoutHold(transform);
+ }
+ return resultBuilder
+ .addOutput(outputBundles)
+ .withTimerUpdate(stepContext.getTimerUpdate())
+ .withCounters(counters)
+ .addUnprocessedElements(unprocessedElements.build())
+ .build();
+ }
+
+ static class BundleOutputManager implements OutputManager {
+ private final Map<TupleTag<?>, UncommittedBundle<?>> bundles;
+ private final Map<TupleTag<?>, List<?>> undeclaredOutputs;
+
+ public static BundleOutputManager create(Map<TupleTag<?>, UncommittedBundle<?>> outputBundles) {
+ return new BundleOutputManager(outputBundles);
+ }
+
+ private BundleOutputManager(Map<TupleTag<?>, UncommittedBundle<?>> bundles) {
+ this.bundles = bundles;
+ undeclaredOutputs = new HashMap<>();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
+ @SuppressWarnings("rawtypes")
+ UncommittedBundle bundle = bundles.get(tag);
+ if (bundle == null) {
+ List undeclaredContents = undeclaredOutputs.get(tag);
+ if (undeclaredContents == null) {
+ undeclaredContents = new ArrayList<T>();
+ undeclaredOutputs.put(tag, undeclaredContents);
+ }
+ undeclaredContents.add(output);
+ } else {
+ bundle.add(output);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4649eebe/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoInProcessEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoInProcessEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoInProcessEvaluator.java
deleted file mode 100644
index b9f4808..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoInProcessEvaluator.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import org.apache.beam.runners.direct.InProcessExecutionContext.InProcessStepContext;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.util.DoFnRunner;
-import org.apache.beam.sdk.util.DoFnRunners;
-import org.apache.beam.sdk.util.DoFnRunners.OutputManager;
-import org.apache.beam.sdk.util.PushbackSideInputDoFnRunner;
-import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
-import org.apache.beam.sdk.util.UserCodeException;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.common.CounterSet;
-import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-
-import com.google.common.collect.ImmutableList;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-class ParDoInProcessEvaluator<T> implements TransformEvaluator<T> {
- public static <InputT, OutputT> ParDoInProcessEvaluator<InputT> create(
- InProcessEvaluationContext evaluationContext,
- CommittedBundle<InputT> inputBundle,
- AppliedPTransform<PCollection<InputT>, ?, ?> application,
- DoFn<InputT, OutputT> fn,
- List<PCollectionView<?>> sideInputs,
- TupleTag<OutputT> mainOutputTag,
- List<TupleTag<?>> sideOutputTags,
- Map<TupleTag<?>, PCollection<?>> outputs) {
- InProcessExecutionContext executionContext =
- evaluationContext.getExecutionContext(application, inputBundle.getKey());
- String stepName = evaluationContext.getStepName(application);
- InProcessStepContext stepContext =
- executionContext.getOrCreateStepContext(stepName, stepName);
-
- CounterSet counters = evaluationContext.createCounterSet();
-
- Map<TupleTag<?>, UncommittedBundle<?>> outputBundles = new HashMap<>();
- for (Map.Entry<TupleTag<?>, PCollection<?>> outputEntry : outputs.entrySet()) {
- outputBundles.put(
- outputEntry.getKey(),
- evaluationContext.createBundle(inputBundle, outputEntry.getValue()));
- }
-
- ReadyCheckingSideInputReader sideInputReader =
- evaluationContext.createSideInputReader(sideInputs);
- DoFnRunner<InputT, OutputT> underlying =
- DoFnRunners.createDefault(
- evaluationContext.getPipelineOptions(),
- fn,
- sideInputReader,
- BundleOutputManager.create(outputBundles),
- mainOutputTag,
- sideOutputTags,
- stepContext,
- counters.getAddCounterMutator(),
- application.getInput().getWindowingStrategy());
- PushbackSideInputDoFnRunner<InputT, OutputT> runner =
- PushbackSideInputDoFnRunner.create(underlying, sideInputs, sideInputReader);
-
- try {
- runner.startBundle();
- } catch (Exception e) {
- throw UserCodeException.wrap(e);
- }
-
- return new ParDoInProcessEvaluator<>(
- runner, application, counters, outputBundles.values(), stepContext);
- }
-
- ////////////////////////////////////////////////////////////////////////////////////////////////
-
- private final PushbackSideInputDoFnRunner<T, ?> fnRunner;
- private final AppliedPTransform<PCollection<T>, ?, ?> transform;
- private final CounterSet counters;
- private final Collection<UncommittedBundle<?>> outputBundles;
- private final InProcessStepContext stepContext;
-
- private final ImmutableList.Builder<WindowedValue<T>> unprocessedElements;
-
- private ParDoInProcessEvaluator(
- PushbackSideInputDoFnRunner<T, ?> fnRunner,
- AppliedPTransform<PCollection<T>, ?, ?> transform,
- CounterSet counters,
- Collection<UncommittedBundle<?>> outputBundles,
- InProcessStepContext stepContext) {
- this.fnRunner = fnRunner;
- this.transform = transform;
- this.counters = counters;
- this.outputBundles = outputBundles;
- this.stepContext = stepContext;
-
- this.unprocessedElements = ImmutableList.builder();
- }
-
- @Override
- public void processElement(WindowedValue<T> element) {
- try {
- Iterable<WindowedValue<T>> unprocessed = fnRunner.processElementInReadyWindows(element);
- unprocessedElements.addAll(unprocessed);
- } catch (Exception e) {
- throw UserCodeException.wrap(e);
- }
- }
-
- @Override
- public InProcessTransformResult finishBundle() {
- try {
- fnRunner.finishBundle();
- } catch (Exception e) {
- throw UserCodeException.wrap(e);
- }
- StepTransformResult.Builder resultBuilder;
- CopyOnAccessInMemoryStateInternals<?> state = stepContext.commitState();
- if (state != null) {
- resultBuilder =
- StepTransformResult.withHold(transform, state.getEarliestWatermarkHold())
- .withState(state);
- } else {
- resultBuilder = StepTransformResult.withoutHold(transform);
- }
- return resultBuilder
- .addOutput(outputBundles)
- .withTimerUpdate(stepContext.getTimerUpdate())
- .withCounters(counters)
- .addUnprocessedElements(unprocessedElements.build())
- .build();
- }
-
- static class BundleOutputManager implements OutputManager {
- private final Map<TupleTag<?>, UncommittedBundle<?>> bundles;
- private final Map<TupleTag<?>, List<?>> undeclaredOutputs;
-
- public static BundleOutputManager create(Map<TupleTag<?>, UncommittedBundle<?>> outputBundles) {
- return new BundleOutputManager(outputBundles);
- }
-
- private BundleOutputManager(Map<TupleTag<?>, UncommittedBundle<?>> bundles) {
- this.bundles = bundles;
- undeclaredOutputs = new HashMap<>();
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
- @SuppressWarnings("rawtypes")
- UncommittedBundle bundle = bundles.get(tag);
- if (bundle == null) {
- List undeclaredContents = undeclaredOutputs.get(tag);
- if (undeclaredContents == null) {
- undeclaredContents = new ArrayList<T>();
- undeclaredOutputs.put(tag, undeclaredContents);
- }
- undeclaredContents.add(output);
- } else {
- bundle.add(output);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4649eebe/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
index 58d6f00..e008bdc 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
@@ -47,7 +47,7 @@ class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory {
public <T> TransformEvaluator<T> forApplication(
AppliedPTransform<?, ?, ?> application,
CommittedBundle<?> inputBundle,
- InProcessEvaluationContext evaluationContext) {
+ EvaluationContext evaluationContext) {
@SuppressWarnings({"unchecked", "rawtypes"})
TransformEvaluator<T> evaluator =
createMultiEvaluator((AppliedPTransform) application, inputBundle, evaluationContext);
@@ -57,14 +57,14 @@ class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory {
private <InT, OuT> TransformEvaluator<InT> createMultiEvaluator(
AppliedPTransform<PCollection<InT>, PCollectionTuple, BoundMulti<InT, OuT>> application,
CommittedBundle<InT> inputBundle,
- InProcessEvaluationContext evaluationContext) {
+ EvaluationContext evaluationContext) {
Map<TupleTag<?>, PCollection<?>> outputs = application.getOutput().getAll();
DoFn<InT, OuT> fn = application.getTransform().getFn();
@SuppressWarnings({"unchecked", "rawtypes"}) ThreadLocal<DoFn<InT, OuT>> fnLocal =
(ThreadLocal) fnClones.getUnchecked(application.getTransform().getFn());
try {
- TransformEvaluator<InT> parDoEvaluator = ParDoInProcessEvaluator.create(evaluationContext,
+ TransformEvaluator<InT> parDoEvaluator = ParDoEvaluator.create(evaluationContext,
inputBundle,
application,
fnLocal.get(),
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4649eebe/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
index afbb6ed..0f7fc83 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
@@ -47,7 +47,7 @@ class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory {
public <T> TransformEvaluator<T> forApplication(
final AppliedPTransform<?, ?, ?> application,
CommittedBundle<?> inputBundle,
- InProcessEvaluationContext evaluationContext) {
+ EvaluationContext evaluationContext) {
@SuppressWarnings({"unchecked", "rawtypes"})
TransformEvaluator<T> evaluator =
createSingleEvaluator((AppliedPTransform) application, inputBundle, evaluationContext);
@@ -57,13 +57,13 @@ class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory {
private <InputT, OutputT> TransformEvaluator<InputT> createSingleEvaluator(
@SuppressWarnings("rawtypes") AppliedPTransform<PCollection<InputT>, PCollection<OutputT>,
Bound<InputT, OutputT>> application,
- CommittedBundle<InputT> inputBundle, InProcessEvaluationContext evaluationContext) {
+ CommittedBundle<InputT> inputBundle, EvaluationContext evaluationContext) {
TupleTag<OutputT> mainOutputTag = new TupleTag<>("out");
@SuppressWarnings({"unchecked", "rawtypes"}) ThreadLocal<DoFn<InputT, OutputT>> fnLocal =
(ThreadLocal) fnClones.getUnchecked(application.getTransform().getFn());
try {
- ParDoInProcessEvaluator<InputT> parDoEvaluator = ParDoInProcessEvaluator.create(
+ ParDoEvaluator<InputT> parDoEvaluator = ParDoEvaluator.create(
evaluationContext,
inputBundle,
application,
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4649eebe/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java
index ba792d3..c6e10e5 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java
@@ -42,7 +42,7 @@ class PassthroughTransformEvaluator<InputT> implements TransformEvaluator<InputT
}
@Override
- public InProcessTransformResult finishBundle() throws Exception {
+ public TransformResult finishBundle() throws Exception {
return StepTransformResult.withoutHold(transform).addOutput(output).build();
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4649eebe/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java
new file mode 100644
index 0000000..76df11c
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+
+import java.util.Collection;
+
+/**
+ * An executor that schedules and executes {@link AppliedPTransform AppliedPTransforms} for both
+ * source and intermediate {@link PTransform PTransforms}.
+ */
+interface PipelineExecutor {
+ /**
+ * Starts this executor. The provided collection is the collection of root transforms to
+ * initially schedule.
+ *
+ * @param rootTransforms
+ */
+ void start(Collection<AppliedPTransform<?, ?, ?>> rootTransforms);
+
+ /**
+ * Blocks until the job being executed enters a terminal state. A job is completed after all
+ * root {@link AppliedPTransform AppliedPTransforms} have completed, and all
+ * {@link CommittedBundle Bundles} have been consumed. Jobs may also terminate abnormally.
+ *
+ * @throws Throwable whenever an executor thread throws anything, transfers the throwable to the
+ * waiting thread and rethrows it
+ */
+ void awaitCompletion() throws Throwable;
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4649eebe/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java
new file mode 100644
index 0000000..7a19ed9
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.PCollectionViewWindow;
+import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollectionView;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Optional;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.annotation.Nullable;
+
+/**
+ * An in-process container for {@link PCollectionView PCollectionViews}, which provides methods for
+ * constructing {@link SideInputReader SideInputReaders} which block until a side input is
+ * available and writing to a {@link PCollectionView}.
+ */
+class SideInputContainer {
+ private final Collection<PCollectionView<?>> containedViews;
+ private final LoadingCache<
+ PCollectionViewWindow<?>, AtomicReference<Iterable<? extends WindowedValue<?>>>>
+ viewByWindows;
+
+ /**
+ * Create a new {@link SideInputContainer} with the provided views and the provided
+ * context.
+ */
+ public static SideInputContainer create(
+ final EvaluationContext context, Collection<PCollectionView<?>> containedViews) {
+ LoadingCache<PCollectionViewWindow<?>, AtomicReference<Iterable<? extends WindowedValue<?>>>>
+ viewByWindows = CacheBuilder.newBuilder().build(new CallbackSchedulingLoader(context));
+ return new SideInputContainer(containedViews, viewByWindows);
+ }
+
+ private SideInputContainer(
+ Collection<PCollectionView<?>> containedViews,
+ LoadingCache<PCollectionViewWindow<?>, AtomicReference<Iterable<? extends WindowedValue<?>>>>
+ viewByWindows) {
+ this.containedViews = ImmutableSet.copyOf(containedViews);
+ this.viewByWindows = viewByWindows;
+ }
+
+ /**
+ * Return a view of this {@link SideInputContainer} that contains only the views in the
+ * provided argument. The returned {@link SideInputContainer} is unmodifiable without
+ * casting, but will change as this {@link SideInputContainer} is modified.
+ */
+ public ReadyCheckingSideInputReader createReaderForViews(
+ Collection<PCollectionView<?>> newContainedViews) {
+ if (!containedViews.containsAll(newContainedViews)) {
+ Set<PCollectionView<?>> currentlyContained = ImmutableSet.copyOf(containedViews);
+ Set<PCollectionView<?>> newRequested = ImmutableSet.copyOf(newContainedViews);
+ throw new IllegalArgumentException("Can't create a SideInputReader with unknown views "
+ + Sets.difference(newRequested, currentlyContained));
+ }
+ return new SideInputContainerSideInputReader(newContainedViews);
+ }
+
+ /**
+ * Write the provided values to the provided view.
+ *
+ * <p>The windowed values are first exploded, then for each window the pane is determined. For
+ * each window, if the pane is later than the current pane stored within this container, write
+ * all of the values to the container as the new values of the {@link PCollectionView}.
+ *
+ * <p>The provided iterable is expected to contain only a single window and pane.
+ */
+ public void write(PCollectionView<?> view, Iterable<? extends WindowedValue<?>> values) {
+ Map<BoundedWindow, Collection<WindowedValue<?>>> valuesPerWindow =
+ indexValuesByWindow(values);
+ for (Map.Entry<BoundedWindow, Collection<WindowedValue<?>>> windowValues :
+ valuesPerWindow.entrySet()) {
+ updatePCollectionViewWindowValues(view, windowValues.getKey(), windowValues.getValue());
+ }
+ }
+
+ /**
+ * Index the provided values by all {@link BoundedWindow windows} in which they appear.
+ */
+ private Map<BoundedWindow, Collection<WindowedValue<?>>> indexValuesByWindow(
+ Iterable<? extends WindowedValue<?>> values) {
+ Map<BoundedWindow, Collection<WindowedValue<?>>> valuesPerWindow = new HashMap<>();
+ for (WindowedValue<?> value : values) {
+ for (BoundedWindow window : value.getWindows()) {
+ Collection<WindowedValue<?>> windowValues = valuesPerWindow.get(window);
+ if (windowValues == null) {
+ windowValues = new ArrayList<>();
+ valuesPerWindow.put(window, windowValues);
+ }
+ windowValues.add(value);
+ }
+ }
+ return valuesPerWindow;
+ }
+
+ /**
+ * Set the value of the {@link PCollectionView} in the {@link BoundedWindow} to be based on the
+ * specified values, if the values are part of a later pane than currently exist within the
+ * {@link PCollectionViewWindow}.
+ */
+ private void updatePCollectionViewWindowValues(
+ PCollectionView<?> view, BoundedWindow window, Collection<WindowedValue<?>> windowValues) {
+ PCollectionViewWindow<?> windowedView = PCollectionViewWindow.of(view, window);
+ AtomicReference<Iterable<? extends WindowedValue<?>>> contents =
+ viewByWindows.getUnchecked(windowedView);
+ if (contents.compareAndSet(null, windowValues)) {
+ // the value had never been set, so we set it and are done.
+ return;
+ }
+ PaneInfo newPane = windowValues.iterator().next().getPane();
+
+ Iterable<? extends WindowedValue<?>> existingValues;
+ long existingPane;
+ do {
+ existingValues = contents.get();
+ existingPane =
+ Iterables.isEmpty(existingValues)
+ ? -1L
+ : existingValues.iterator().next().getPane().getIndex();
+ } while (newPane.getIndex() > existingPane
+ && !contents.compareAndSet(existingValues, windowValues));
+ }
+
+ private static class CallbackSchedulingLoader extends
+ CacheLoader<PCollectionViewWindow<?>, AtomicReference<Iterable<? extends WindowedValue<?>>>> {
+ private final EvaluationContext context;
+
+ public CallbackSchedulingLoader(
+ EvaluationContext context) {
+ this.context = context;
+ }
+
+ @Override
+ public AtomicReference<Iterable<? extends WindowedValue<?>>>
+ load(PCollectionViewWindow<?> view) {
+
+ AtomicReference<Iterable<? extends WindowedValue<?>>> contents = new AtomicReference<>();
+ WindowingStrategy<?, ?> windowingStrategy = view.getView().getWindowingStrategyInternal();
+
+ context.scheduleAfterOutputWouldBeProduced(view.getView(),
+ view.getWindow(),
+ windowingStrategy,
+ new WriteEmptyViewContents(view.getView(), view.getWindow(), contents));
+ return contents;
+ }
+ }
+
+ private static class WriteEmptyViewContents implements Runnable {
+ private final PCollectionView<?> view;
+ private final BoundedWindow window;
+ private final AtomicReference<Iterable<? extends WindowedValue<?>>> contents;
+
+ private WriteEmptyViewContents(PCollectionView<?> view, BoundedWindow window,
+ AtomicReference<Iterable<? extends WindowedValue<?>>> contents) {
+ this.contents = contents;
+ this.view = view;
+ this.window = window;
+ }
+
+ @Override
+ public void run() {
+ // The requested window has closed without producing elements, so reflect that in
+ // the PCollectionView. If set has already been called, will do nothing.
+ contents.compareAndSet(null, Collections.<WindowedValue<?>>emptyList());
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("view", view)
+ .add("window", window)
+ .toString();
+ }
+ }
+
+ private final class SideInputContainerSideInputReader implements ReadyCheckingSideInputReader {
+ private final Collection<PCollectionView<?>> readerViews;
+ private final LoadingCache<
+ PCollectionViewWindow<?>, Optional<? extends Iterable<? extends WindowedValue<?>>>>
+ viewContents;
+
+ private SideInputContainerSideInputReader(Collection<PCollectionView<?>> readerViews) {
+ this.readerViews = ImmutableSet.copyOf(readerViews);
+ this.viewContents = CacheBuilder.newBuilder().build(new CurrentViewContentsLoader());
+ }
+
+ @Override
+ public boolean isReady(final PCollectionView<?> view, final BoundedWindow window) {
+ checkArgument(
+ readerViews.contains(view),
+ "Tried to check if view %s was ready in a SideInputReader that does not contain it. "
+ + "Contained views; %s",
+ view,
+ readerViews);
+ return viewContents.getUnchecked(PCollectionViewWindow.of(view, window)).isPresent();
+ }
+
+ @Override
+ @Nullable
+ public <T> T get(final PCollectionView<T> view, final BoundedWindow window) {
+ checkArgument(readerViews.contains(view),
+ "call to get(PCollectionView) with unknown view: %s",
+ view);
+ checkArgument(
+ isReady(view, window),
+ "calling get() on PCollectionView %s that is not ready in window %s",
+ view,
+ window);
+ // Safe covariant cast
+ @SuppressWarnings("unchecked") Iterable<WindowedValue<?>> values =
+ (Iterable<WindowedValue<?>>) viewContents.getUnchecked(PCollectionViewWindow.of(view,
+ window)).get();
+ return view.fromIterableInternal(values);
+ }
+
+ @Override
+ public <T> boolean contains(PCollectionView<T> view) {
+ return readerViews.contains(view);
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return readerViews.isEmpty();
+ }
+ }
+
+ /**
+ * A {@link CacheLoader} that loads the current contents of a {@link PCollectionViewWindow} into
+ * an optional.
+ */
+ private class CurrentViewContentsLoader extends CacheLoader<
+ PCollectionViewWindow<?>, Optional<? extends Iterable<? extends WindowedValue<?>>>> {
+
+ @Override
+ public Optional<? extends Iterable<? extends WindowedValue<?>>>
+ load(PCollectionViewWindow<?> key) {
+ return Optional.fromNullable(viewByWindows.getUnchecked(key).get());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4649eebe/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
index eacea91..5706b2a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
@@ -19,8 +19,8 @@ package org.apache.beam.runners.direct;
import static com.google.common.base.Preconditions.checkNotNull;
-import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate;
import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
@@ -37,9 +37,9 @@ import java.util.Collection;
import javax.annotation.Nullable;
/**
- * An immutable {@link InProcessTransformResult}.
+ * An immutable {@link TransformResult}.
*/
-public class StepTransformResult implements InProcessTransformResult {
+public class StepTransformResult implements TransformResult {
private final AppliedPTransform<?, ?, ?> transform;
private final Iterable<? extends UncommittedBundle<?>> bundles;
private final Iterable<? extends WindowedValue<?>> unprocessedElements;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4649eebe/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ThreadLocalInvalidatingTransformEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ThreadLocalInvalidatingTransformEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ThreadLocalInvalidatingTransformEvaluator.java
index 36bdff4..d8a6bf9 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ThreadLocalInvalidatingTransformEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ThreadLocalInvalidatingTransformEvaluator.java
@@ -52,7 +52,7 @@ class ThreadLocalInvalidatingTransformEvaluator<InputT>
}
@Override
- public InProcessTransformResult finishBundle() throws Exception {
+ public TransformResult finishBundle() throws Exception {
try {
return underlying.finishBundle();
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4649eebe/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java
index 1fec9d8..6c8e48b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java
@@ -40,7 +40,7 @@ public interface TransformEvaluator<InputT> {
* After {@link #finishBundle()} is called, the {@link TransformEvaluator} will not be reused,
* and no more elements will be processed.
*
- * @return an {@link InProcessTransformResult} containing the results of this bundle evaluation.
+ * @return an {@link TransformResult} containing the results of this bundle evaluation.
*/
- InProcessTransformResult finishBundle() throws Exception;
+ TransformResult finishBundle() throws Exception;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4649eebe/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
index b12a34c..1973a2f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
@@ -46,5 +46,5 @@ public interface TransformEvaluatorFactory {
*/
@Nullable <InputT> TransformEvaluator<InputT> forApplication(
AppliedPTransform<?, ?, ?> application, @Nullable CommittedBundle<?> inputBundle,
- InProcessEvaluationContext evaluationContext) throws Exception;
+ EvaluationContext evaluationContext) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4649eebe/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
index dfc1753..f0afc3b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
@@ -17,8 +17,8 @@
*/
package org.apache.beam.runners.direct;
-import org.apache.beam.runners.direct.InProcessGroupByKey.InProcessGroupAlsoByWindow;
-import org.apache.beam.runners.direct.InProcessGroupByKey.InProcessGroupByKeyOnly;
+import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow;
+import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.transforms.AppliedPTransform;
@@ -50,8 +50,8 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
.put(ViewEvaluatorFactory.WriteView.class, new ViewEvaluatorFactory())
.put(Window.Bound.class, new WindowEvaluatorFactory())
// Runner-specific primitives used in expansion of GroupByKey
- .put(InProcessGroupByKeyOnly.class, new InProcessGroupByKeyOnlyEvaluatorFactory())
- .put(InProcessGroupAlsoByWindow.class, new InProcessGroupAlsoByWindowEvaluatorFactory())
+ .put(DirectGroupByKeyOnly.class, new GroupByKeyOnlyEvaluatorFactory())
+ .put(DirectGroupAlsoByWindow.class, new GroupAlsoByWindowEvaluatorFactory())
.build();
return new TransformEvaluatorRegistry(primitives);
}
@@ -71,7 +71,7 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
public <InputT> TransformEvaluator<InputT> forApplication(
AppliedPTransform<?, ?, ?> application,
@Nullable CommittedBundle<?> inputBundle,
- InProcessEvaluationContext evaluationContext)
+ EvaluationContext evaluationContext)
throws Exception {
TransformEvaluatorFactory factory = factories.get(application.getTransform().getClass());
return factory.forApplication(application, inputBundle, evaluationContext);