You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/04/29 23:56:11 UTC
[13/17] incubator-beam git commit: Move InProcessRunner to its own
module
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoInProcessEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoInProcessEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoInProcessEvaluator.java
new file mode 100644
index 0000000..1c51738
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoInProcessEvaluator.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.InProcessExecutionContext.InProcessStepContext;
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.util.DoFnRunner;
+import org.apache.beam.sdk.util.DoFnRunners;
+import org.apache.beam.sdk.util.DoFnRunners.OutputManager;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.UserCodeException;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.common.CounterSet;
+import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+class ParDoInProcessEvaluator<T> implements TransformEvaluator<T> {
+ public static <InputT, OutputT> ParDoInProcessEvaluator<InputT> create(
+ InProcessEvaluationContext evaluationContext,
+ CommittedBundle<InputT> inputBundle,
+ AppliedPTransform<PCollection<InputT>, ?, ?> application,
+ DoFn<InputT, OutputT> fn,
+ List<PCollectionView<?>> sideInputs,
+ TupleTag<OutputT> mainOutputTag,
+ List<TupleTag<?>> sideOutputTags,
+ Map<TupleTag<?>, PCollection<?>> outputs) {
+ InProcessExecutionContext executionContext =
+ evaluationContext.getExecutionContext(application, inputBundle.getKey());
+ String stepName = evaluationContext.getStepName(application);
+ InProcessStepContext stepContext =
+ executionContext.getOrCreateStepContext(stepName, stepName);
+
+ CounterSet counters = evaluationContext.createCounterSet();
+
+ Map<TupleTag<?>, UncommittedBundle<?>> outputBundles = new HashMap<>();
+ for (Map.Entry<TupleTag<?>, PCollection<?>> outputEntry : outputs.entrySet()) {
+ outputBundles.put(
+ outputEntry.getKey(),
+ evaluationContext.createBundle(inputBundle, outputEntry.getValue()));
+ }
+
+ DoFnRunner<InputT, OutputT> runner =
+ DoFnRunners.createDefault(
+ evaluationContext.getPipelineOptions(),
+ SerializableUtils.clone(fn),
+ evaluationContext.createSideInputReader(sideInputs),
+ BundleOutputManager.create(outputBundles),
+ mainOutputTag,
+ sideOutputTags,
+ stepContext,
+ counters.getAddCounterMutator(),
+ application.getInput().getWindowingStrategy());
+
+ try {
+ runner.startBundle();
+ } catch (Exception e) {
+ throw UserCodeException.wrap(e);
+ }
+
+ return new ParDoInProcessEvaluator<>(
+ runner, application, counters, outputBundles.values(), stepContext);
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////
+
+ private final DoFnRunner<T, ?> fnRunner;
+ private final AppliedPTransform<PCollection<T>, ?, ?> transform;
+ private final CounterSet counters;
+ private final Collection<UncommittedBundle<?>> outputBundles;
+ private final InProcessStepContext stepContext;
+
+ private ParDoInProcessEvaluator(
+ DoFnRunner<T, ?> fnRunner,
+ AppliedPTransform<PCollection<T>, ?, ?> transform,
+ CounterSet counters,
+ Collection<UncommittedBundle<?>> outputBundles,
+ InProcessStepContext stepContext) {
+ this.fnRunner = fnRunner;
+ this.transform = transform;
+ this.counters = counters;
+ this.outputBundles = outputBundles;
+ this.stepContext = stepContext;
+ }
+
+ @Override
+ public void processElement(WindowedValue<T> element) {
+ try {
+ fnRunner.processElement(element);
+ } catch (Exception e) {
+ throw UserCodeException.wrap(e);
+ }
+ }
+
+ @Override
+ public InProcessTransformResult finishBundle() {
+ try {
+ fnRunner.finishBundle();
+ } catch (Exception e) {
+ throw UserCodeException.wrap(e);
+ }
+ StepTransformResult.Builder resultBuilder;
+ CopyOnAccessInMemoryStateInternals<?> state = stepContext.commitState();
+ if (state != null) {
+ resultBuilder =
+ StepTransformResult.withHold(transform, state.getEarliestWatermarkHold())
+ .withState(state);
+ } else {
+ resultBuilder = StepTransformResult.withoutHold(transform);
+ }
+ return resultBuilder
+ .addOutput(outputBundles)
+ .withTimerUpdate(stepContext.getTimerUpdate())
+ .withCounters(counters)
+ .build();
+ }
+
+ static class BundleOutputManager implements OutputManager {
+ private final Map<TupleTag<?>, UncommittedBundle<?>> bundles;
+ private final Map<TupleTag<?>, List<?>> undeclaredOutputs;
+
+ public static BundleOutputManager create(Map<TupleTag<?>, UncommittedBundle<?>> outputBundles) {
+ return new BundleOutputManager(outputBundles);
+ }
+
+ private BundleOutputManager(Map<TupleTag<?>, UncommittedBundle<?>> bundles) {
+ this.bundles = bundles;
+ undeclaredOutputs = new HashMap<>();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
+ @SuppressWarnings("rawtypes")
+ UncommittedBundle bundle = bundles.get(tag);
+ if (bundle == null) {
+ List undeclaredContents = undeclaredOutputs.get(tag);
+ if (undeclaredContents == null) {
+ undeclaredContents = new ArrayList<T>();
+ undeclaredOutputs.put(tag, undeclaredContents);
+ }
+ undeclaredContents.add(output);
+ } else {
+ bundle.add(output);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
new file mode 100644
index 0000000..ae8ac6f
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo.BoundMulti;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+
+import java.util.Map;
+
+/**
+ * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the
+ * {@link BoundMulti} primitive {@link PTransform}.
+ */
+class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory {
+ @Override
+ public <T> TransformEvaluator<T> forApplication(
+ AppliedPTransform<?, ?, ?> application,
+ CommittedBundle<?> inputBundle,
+ InProcessEvaluationContext evaluationContext) {
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ TransformEvaluator<T> evaluator =
+ createMultiEvaluator((AppliedPTransform) application, inputBundle, evaluationContext);
+ return evaluator;
+ }
+
+ private static <InT, OuT> ParDoInProcessEvaluator<InT> createMultiEvaluator(
+ AppliedPTransform<PCollection<InT>, PCollectionTuple, BoundMulti<InT, OuT>> application,
+ CommittedBundle<InT> inputBundle,
+ InProcessEvaluationContext evaluationContext) {
+ Map<TupleTag<?>, PCollection<?>> outputs = application.getOutput().getAll();
+ DoFn<InT, OuT> fn = application.getTransform().getFn();
+
+ return ParDoInProcessEvaluator.create(
+ evaluationContext,
+ inputBundle,
+ application,
+ fn,
+ application.getTransform().getSideInputs(),
+ application.getTransform().getMainOutputTag(),
+ application.getTransform().getSideOutputTags().getAll(),
+ outputs);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
new file mode 100644
index 0000000..989ae51
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo.Bound;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.util.Collections;
+
+/**
+ * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the
+ * {@link Bound ParDo.Bound} primitive {@link PTransform}.
+ */
+class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory {
+ @Override
+ public <T> TransformEvaluator<T> forApplication(
+ final AppliedPTransform<?, ?, ?> application,
+ CommittedBundle<?> inputBundle,
+ InProcessEvaluationContext evaluationContext) {
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ TransformEvaluator<T> evaluator =
+ createSingleEvaluator((AppliedPTransform) application, inputBundle, evaluationContext);
+ return evaluator;
+ }
+
+ private static <InputT, OutputT> ParDoInProcessEvaluator<InputT> createSingleEvaluator(
+ @SuppressWarnings("rawtypes") AppliedPTransform<PCollection<InputT>, PCollection<OutputT>,
+ Bound<InputT, OutputT>> application,
+ CommittedBundle<InputT> inputBundle, InProcessEvaluationContext evaluationContext) {
+ TupleTag<OutputT> mainOutputTag = new TupleTag<>("out");
+
+ return ParDoInProcessEvaluator.create(
+ evaluationContext,
+ inputBundle,
+ application,
+ application.getTransform().getFn(),
+ application.getTransform().getSideInputs(),
+ mainOutputTag,
+ Collections.<TupleTag<?>>emptyList(),
+ ImmutableMap.<TupleTag<?>, PCollection<?>>of(mainOutputTag, application.getOutput()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java
new file mode 100644
index 0000000..aef62b2
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.util.WindowedValue;
+
+class PassthroughTransformEvaluator<InputT> implements TransformEvaluator<InputT> {
+ public static <InputT> PassthroughTransformEvaluator<InputT> create(
+ AppliedPTransform<?, ?, ?> transform, UncommittedBundle<InputT> output) {
+ return new PassthroughTransformEvaluator<>(transform, output);
+ }
+
+ private final AppliedPTransform<?, ?, ?> transform;
+ private final UncommittedBundle<InputT> output;
+
+ private PassthroughTransformEvaluator(
+ AppliedPTransform<?, ?, ?> transform, UncommittedBundle<InputT> output) {
+ this.transform = transform;
+ this.output = output;
+ }
+
+ @Override
+ public void processElement(WindowedValue<InputT> element) throws Exception {
+ output.add(element);
+ }
+
+ @Override
+ public InProcessTransformResult finishBundle() throws Exception {
+ return StepTransformResult.withoutHold(transform).addOutput(output).build();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ShardControlledWrite.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ShardControlledWrite.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ShardControlledWrite.java
new file mode 100644
index 0000000..4687f85
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ShardControlledWrite.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.Partition;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.PDone;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * A write that explicitly controls its number of output shards.
+ */
+abstract class ShardControlledWrite<InputT>
+ extends ForwardingPTransform<PCollection<InputT>, PDone> {
+ @Override
+ public PDone apply(PCollection<InputT> input) {
+ int numShards = getNumShards();
+ checkArgument(
+ numShards >= 1,
+ "%s should only be applied if the output has a controlled number of shards (> 1); got %s",
+ getClass().getSimpleName(),
+ getNumShards());
+ PCollectionList<InputT> shards =
+ input.apply(
+ "PartitionInto" + numShards + "Shards",
+ Partition.of(getNumShards(), new RandomSeedPartitionFn<InputT>()));
+ for (int i = 0; i < shards.size(); i++) {
+ PCollection<InputT> shard = shards.get(i);
+ PTransform<? super PCollection<InputT>, PDone> writeShard = getSingleShardTransform(i);
+ shard.apply(String.format("%s(Shard:%s)", writeShard.getName(), i), writeShard);
+ }
+ return PDone.in(input.getPipeline());
+ }
+
+ /**
+ * Returns the number of shards this {@link PTransform} should write to.
+ */
+ abstract int getNumShards();
+
+ /**
+ * Returns a {@link PTransform} that performs a write to the shard with the specified shard
+ * number.
+ *
+ * <p>This method will be called n times, where n is the value of {@link #getNumShards()}, for
+ * shard numbers {@code [0...n)}.
+ */
+ abstract PTransform<? super PCollection<InputT>, PDone> getSingleShardTransform(int shardNum);
+
+ private static class RandomSeedPartitionFn<T> implements Partition.PartitionFn<T> {
+ int nextPartition = -1;
+ @Override
+ public int partitionFor(T elem, int numPartitions) {
+ if (nextPartition < 0) {
+ nextPartition = ThreadLocalRandom.current().nextInt(numPartitions);
+ }
+ nextPartition++;
+ nextPartition %= numPartitions;
+ return nextPartition;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepAndKey.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepAndKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepAndKey.java
new file mode 100644
index 0000000..1c7cf6c
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepAndKey.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+
+import com.google.common.base.MoreObjects;
+
+import java.util.Objects;
+
+/**
+ * A (Step, Key) pair. This is useful as a map key or cache key for things that are available
+ * per-step in a keyed manner (e.g. State).
+ */
+final class StepAndKey {
+ private final AppliedPTransform<?, ?, ?> step;
+ private final Object key;
+
+ /**
+ * Create a new {@link StepAndKey} with the provided step and key.
+ */
+ public static StepAndKey of(AppliedPTransform<?, ?, ?> step, Object key) {
+ return new StepAndKey(step, key);
+ }
+
+ private StepAndKey(AppliedPTransform<?, ?, ?> step, Object key) {
+ this.step = step;
+ this.key = key;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(StepAndKey.class)
+ .add("step", step.getFullName())
+ .add("key", key)
+ .toString();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(step, key);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == this) {
+ return true;
+ } else if (!(other instanceof StepAndKey)) {
+ return false;
+ } else {
+ StepAndKey that = (StepAndKey) other;
+ return Objects.equals(this.step, that.step)
+ && Objects.equals(this.key, that.key);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
new file mode 100644
index 0000000..46e7d04
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate;
+import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.common.CounterSet;
+import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableList;
+
+import org.joda.time.Instant;
+
+import java.util.Collection;
+
+import javax.annotation.Nullable;
+
+/**
+ * An immutable {@link InProcessTransformResult}.
+ */
+public class StepTransformResult implements InProcessTransformResult {
+ private final AppliedPTransform<?, ?, ?> transform;
+ private final Iterable<? extends UncommittedBundle<?>> bundles;
+ @Nullable private final CopyOnAccessInMemoryStateInternals<?> state;
+ private final TimerUpdate timerUpdate;
+ @Nullable private final CounterSet counters;
+ private final Instant watermarkHold;
+
+ private StepTransformResult(
+ AppliedPTransform<?, ?, ?> transform,
+ Iterable<? extends UncommittedBundle<?>> outputBundles,
+ CopyOnAccessInMemoryStateInternals<?> state,
+ TimerUpdate timerUpdate,
+ CounterSet counters,
+ Instant watermarkHold) {
+ this.transform = checkNotNull(transform);
+ this.bundles = checkNotNull(outputBundles);
+ this.state = state;
+ this.timerUpdate = checkNotNull(timerUpdate);
+ this.counters = counters;
+ this.watermarkHold = checkNotNull(watermarkHold);
+ }
+
+ @Override
+ public Iterable<? extends UncommittedBundle<?>> getOutputBundles() {
+ return bundles;
+ }
+
+ @Override
+ public CounterSet getCounters() {
+ return counters;
+ }
+
+ @Override
+ public AppliedPTransform<?, ?, ?> getTransform() {
+ return transform;
+ }
+
+ @Override
+ public Instant getWatermarkHold() {
+ return watermarkHold;
+ }
+
+ @Nullable
+ @Override
+ public CopyOnAccessInMemoryStateInternals<?> getState() {
+ return state;
+ }
+
+ @Override
+ public TimerUpdate getTimerUpdate() {
+ return timerUpdate;
+ }
+
+ public static Builder withHold(AppliedPTransform<?, ?, ?> transform, Instant watermarkHold) {
+ return new Builder(transform, watermarkHold);
+ }
+
+ public static Builder withoutHold(AppliedPTransform<?, ?, ?> transform) {
+ return new Builder(transform, BoundedWindow.TIMESTAMP_MAX_VALUE);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(StepTransformResult.class)
+ .add("transform", transform)
+ .toString();
+ }
+
+ /**
+ * A builder for creating instances of {@link StepTransformResult}.
+ */
+ public static class Builder {
+ private final AppliedPTransform<?, ?, ?> transform;
+ private final ImmutableList.Builder<UncommittedBundle<?>> bundlesBuilder;
+ private CopyOnAccessInMemoryStateInternals<?> state;
+ private TimerUpdate timerUpdate;
+ private CounterSet counters;
+ private final Instant watermarkHold;
+
+ private Builder(AppliedPTransform<?, ?, ?> transform, Instant watermarkHold) {
+ this.transform = transform;
+ this.watermarkHold = watermarkHold;
+ this.bundlesBuilder = ImmutableList.builder();
+ this.timerUpdate = TimerUpdate.builder(null).build();
+ }
+
+ public StepTransformResult build() {
+ return new StepTransformResult(
+ transform,
+ bundlesBuilder.build(),
+ state,
+ timerUpdate,
+ counters,
+ watermarkHold);
+ }
+
+ public Builder withCounters(CounterSet counters) {
+ this.counters = counters;
+ return this;
+ }
+
+ public Builder withState(CopyOnAccessInMemoryStateInternals<?> state) {
+ this.state = state;
+ return this;
+ }
+
+ public Builder withTimerUpdate(TimerUpdate timerUpdate) {
+ this.timerUpdate = timerUpdate;
+ return this;
+ }
+
+ public Builder addOutput(
+ UncommittedBundle<?> outputBundle, UncommittedBundle<?>... outputBundles) {
+ bundlesBuilder.add(outputBundle);
+ bundlesBuilder.add(outputBundles);
+ return this;
+ }
+
+ public Builder addOutput(Collection<UncommittedBundle<?>> outputBundles) {
+ bundlesBuilder.addAll(outputBundles);
+ return this;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TextIOShardedWriteFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TextIOShardedWriteFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TextIOShardedWriteFactory.java
new file mode 100644
index 0000000..be1bf18
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TextIOShardedWriteFactory.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.TextIO.Write.Bound;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.IOChannelUtils;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+
+class TextIOShardedWriteFactory implements PTransformOverrideFactory {
+
+ @Override
+ public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override(
+ PTransform<InputT, OutputT> transform) {
+ if (transform instanceof TextIO.Write.Bound) {
+ @SuppressWarnings("unchecked")
+ TextIO.Write.Bound<InputT> originalWrite = (TextIO.Write.Bound<InputT>) transform;
+ if (originalWrite.getNumShards() > 1
+ || (originalWrite.getNumShards() == 1
+ && !"".equals(originalWrite.getShardNameTemplate()))) {
+ @SuppressWarnings("unchecked")
+ PTransform<InputT, OutputT> override =
+ (PTransform<InputT, OutputT>) new TextIOShardedWrite<InputT>(originalWrite);
+ return override;
+ }
+ }
+ return transform;
+ }
+
+ private static class TextIOShardedWrite<InputT> extends ShardControlledWrite<InputT> {
+ private final TextIO.Write.Bound<InputT> initial;
+
+ private TextIOShardedWrite(Bound<InputT> initial) {
+ this.initial = initial;
+ }
+
+ @Override
+ int getNumShards() {
+ return initial.getNumShards();
+ }
+
+ @Override
+ PTransform<PCollection<InputT>, PDone> getSingleShardTransform(int shardNum) {
+ String shardName =
+ IOChannelUtils.constructName(
+ initial.getFilenamePrefix(),
+ initial.getShardTemplate(),
+ initial.getFilenameSuffix(),
+ shardNum,
+ getNumShards());
+ return TextIO.Write.withCoder(initial.getCoder()).to(shardName).withoutSharding();
+ }
+
+ @Override
+ protected PTransform<PCollection<InputT>, PDone> delegate() {
+ return initial;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java
new file mode 100644
index 0000000..ba9815b
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.sdk.util.WindowedValue;
+
+/**
+ * An evaluator of a specific application of a transform. Will be used for at least one
+ * {@link CommittedBundle}.
+ *
+ * @param <InputT> the type of elements that will be passed to {@link #processElement}
+ */
+public interface TransformEvaluator<InputT> {
+ /**
+ * Process an element in the input {@link CommittedBundle}.
+ *
+ * @param element the element to process
+ */
+ void processElement(WindowedValue<InputT> element) throws Exception;
+
+ /**
+ * Finish processing the bundle of this {@link TransformEvaluator}.
+ *
+ * After {@link #finishBundle()} is called, the {@link TransformEvaluator} will not be reused,
+ * and no more elements will be processed.
+ *
+ * @return an {@link InProcessTransformResult} containing the results of this bundle evaluation.
+ */
+ InProcessTransformResult finishBundle() throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
new file mode 100644
index 0000000..8f8d84c
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+
+import javax.annotation.Nullable;
+
+/**
+ * A factory for creating instances of {@link TransformEvaluator} for the application of a
+ * {@link PTransform}.
+ */
+public interface TransformEvaluatorFactory {
+ /**
+ * Create a new {@link TransformEvaluator} for the application of the {@link PTransform}.
+ *
+ * Any work that must be done before input elements are processed (such as calling
+ * {@link DoFn#startBundle(DoFn.Context)}) must be done before the {@link TransformEvaluator} is
+ * made available to the caller.
+ *
+ * @throws Exception whenever constructing the underlying evaluator throws an exception
+ */
+ <InputT> TransformEvaluator<InputT> forApplication(
+ AppliedPTransform<?, ?, ?> application, @Nullable CommittedBundle<?> inputBundle,
+ InProcessEvaluationContext evaluationContext) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
new file mode 100644
index 0000000..f449731
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.Window;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+/**
+ * A {@link TransformEvaluatorFactory} that delegates to primitive {@link TransformEvaluatorFactory}
+ * implementations based on the type of {@link PTransform} of the application.
+ */
+class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
+ public static TransformEvaluatorRegistry defaultRegistry() {
+ @SuppressWarnings("rawtypes")
+ ImmutableMap<Class<? extends PTransform>, TransformEvaluatorFactory> primitives =
+ ImmutableMap.<Class<? extends PTransform>, TransformEvaluatorFactory>builder()
+ .put(Read.Bounded.class, new BoundedReadEvaluatorFactory())
+ .put(Read.Unbounded.class, new UnboundedReadEvaluatorFactory())
+ .put(ParDo.Bound.class, new ParDoSingleEvaluatorFactory())
+ .put(ParDo.BoundMulti.class, new ParDoMultiEvaluatorFactory())
+ .put(
+ GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly.class,
+ new GroupByKeyEvaluatorFactory())
+ .put(FlattenPCollectionList.class, new FlattenEvaluatorFactory())
+ .put(ViewEvaluatorFactory.WriteView.class, new ViewEvaluatorFactory())
+ .put(Window.Bound.class, new WindowEvaluatorFactory())
+ .build();
+ return new TransformEvaluatorRegistry(primitives);
+ }
+
+ // the TransformEvaluatorFactories can construct instances of all generic types of transform,
+ // so all instances of a primitive can be handled with the same evaluator factory.
+ @SuppressWarnings("rawtypes")
+ private final Map<Class<? extends PTransform>, TransformEvaluatorFactory> factories;
+
+ private TransformEvaluatorRegistry(
+ @SuppressWarnings("rawtypes")
+ Map<Class<? extends PTransform>, TransformEvaluatorFactory> factories) {
+ this.factories = factories;
+ }
+
+ @Override
+ public <InputT> TransformEvaluator<InputT> forApplication(
+ AppliedPTransform<?, ?, ?> application,
+ @Nullable CommittedBundle<?> inputBundle,
+ InProcessEvaluationContext evaluationContext)
+ throws Exception {
+ TransformEvaluatorFactory factory = factories.get(application.getTransform().getClass());
+ return factory.forApplication(application, inputBundle, evaluationContext);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
new file mode 100644
index 0000000..8346e89
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.util.WindowedValue;
+
+import com.google.common.base.Throwables;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.annotation.Nullable;
+
+/**
+ * A {@link Callable} responsible for constructing a {@link TransformEvaluator} from a
+ * {@link TransformEvaluatorFactory} and evaluating it on some bundle of input, and registering
+ * the result using a registered {@link CompletionCallback}.
+ *
+ * <p>A {@link TransformExecutor} that is currently executing also provides access to the thread
+ * that it is being executed on.
+ */
+class TransformExecutor<T> implements Callable<InProcessTransformResult> {
+ public static <T> TransformExecutor<T> create(
+ TransformEvaluatorFactory factory,
+ Iterable<? extends ModelEnforcementFactory> modelEnforcements,
+ InProcessEvaluationContext evaluationContext,
+ CommittedBundle<T> inputBundle,
+ AppliedPTransform<?, ?, ?> transform,
+ CompletionCallback completionCallback,
+ TransformExecutorService transformEvaluationState) {
+ return new TransformExecutor<>(
+ factory,
+ modelEnforcements,
+ evaluationContext,
+ inputBundle,
+ transform,
+ completionCallback,
+ transformEvaluationState);
+ }
+
+ private final TransformEvaluatorFactory evaluatorFactory;
+ private final Iterable<? extends ModelEnforcementFactory> modelEnforcements;
+
+ private final InProcessEvaluationContext evaluationContext;
+
+ /** The transform that will be evaluated. */
+ private final AppliedPTransform<?, ?, ?> transform;
+ /** The inputs this {@link TransformExecutor} will deliver to the transform. */
+ private final CommittedBundle<T> inputBundle;
+
+ private final CompletionCallback onComplete;
+ private final TransformExecutorService transformEvaluationState;
+
+ private final AtomicReference<Thread> thread;
+
+ private TransformExecutor(
+ TransformEvaluatorFactory factory,
+ Iterable<? extends ModelEnforcementFactory> modelEnforcements,
+ InProcessEvaluationContext evaluationContext,
+ CommittedBundle<T> inputBundle,
+ AppliedPTransform<?, ?, ?> transform,
+ CompletionCallback completionCallback,
+ TransformExecutorService transformEvaluationState) {
+ this.evaluatorFactory = factory;
+ this.modelEnforcements = modelEnforcements;
+ this.evaluationContext = evaluationContext;
+
+ this.inputBundle = inputBundle;
+ this.transform = transform;
+
+ this.onComplete = completionCallback;
+
+ this.transformEvaluationState = transformEvaluationState;
+ this.thread = new AtomicReference<>();
+ }
+
+ @Override
+ public InProcessTransformResult call() {
+ checkState(
+ thread.compareAndSet(null, Thread.currentThread()),
+ "Tried to execute %s for %s on thread %s, but is already executing on thread %s",
+ TransformExecutor.class.getSimpleName(),
+ transform.getFullName(),
+ Thread.currentThread(),
+ thread.get());
+ try {
+ Collection<ModelEnforcement<T>> enforcements = new ArrayList<>();
+ for (ModelEnforcementFactory enforcementFactory : modelEnforcements) {
+ ModelEnforcement<T> enforcement = enforcementFactory.forBundle(inputBundle, transform);
+ enforcements.add(enforcement);
+ }
+ TransformEvaluator<T> evaluator =
+ evaluatorFactory.forApplication(transform, inputBundle, evaluationContext);
+
+ processElements(evaluator, enforcements);
+
+ InProcessTransformResult result = finishBundle(evaluator, enforcements);
+ return result;
+ } catch (Throwable t) {
+ onComplete.handleThrowable(inputBundle, t);
+ throw Throwables.propagate(t);
+ } finally {
+ transformEvaluationState.complete(this);
+ }
+ }
+
+ /**
+ * Processes all the elements in the input bundle using the transform evaluator, applying any
+ * necessary {@link ModelEnforcement ModelEnforcements}.
+ */
+ private void processElements(
+ TransformEvaluator<T> evaluator, Collection<ModelEnforcement<T>> enforcements)
+ throws Exception {
+ if (inputBundle != null) {
+ for (WindowedValue<T> value : inputBundle.getElements()) {
+ for (ModelEnforcement<T> enforcement : enforcements) {
+ enforcement.beforeElement(value);
+ }
+
+ evaluator.processElement(value);
+
+ for (ModelEnforcement<T> enforcement : enforcements) {
+ enforcement.afterElement(value);
+ }
+ }
+ }
+ }
+
+ /**
+ * Finishes processing the input bundle and commit the result using the
+ * {@link CompletionCallback}, applying any {@link ModelEnforcement} if necessary.
+ *
+ * @return the {@link InProcessTransformResult} produced by
+ * {@link TransformEvaluator#finishBundle()}
+ */
+ private InProcessTransformResult finishBundle(
+ TransformEvaluator<T> evaluator, Collection<ModelEnforcement<T>> enforcements)
+ throws Exception {
+ InProcessTransformResult result = evaluator.finishBundle();
+ CommittedResult outputs = onComplete.handleResult(inputBundle, result);
+ for (ModelEnforcement<T> enforcement : enforcements) {
+ enforcement.afterFinish(inputBundle, result, outputs.getOutputs());
+ }
+ return result;
+ }
+
+ /**
+ * If this {@link TransformExecutor} is currently executing, return the thread it is executing in.
+ * Otherwise, return null.
+ */
+ @Nullable
+ public Thread getThread() {
+ return thread.get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorService.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorService.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorService.java
new file mode 100644
index 0000000..837b858
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorService.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+/**
+ * Schedules and completes {@link TransformExecutor TransformExecutors}, controlling concurrency as
+ * appropriate for the {@link StepAndKey} the executor exists for.
+ */
+interface TransformExecutorService {
+ /**
+ * Schedule the provided work to be eventually executed.
+ */
+ void schedule(TransformExecutor<?> work);
+
+ /**
+ * Finish executing the provided work. This may cause additional
+ * {@link TransformExecutor TransformExecutors} to be evaluated.
+ */
+ void complete(TransformExecutor<?> completed);
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java
new file mode 100644
index 0000000..087b7c2
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import com.google.common.base.MoreObjects;
+
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Static factory methods for constructing instances of {@link TransformExecutorService}.
+ */
+final class TransformExecutorServices {
+ private TransformExecutorServices() {
+ // Do not instantiate
+ }
+
+ /**
+ * Returns an EvaluationState that evaluates {@link TransformExecutor TransformExecutors} in
+ * parallel.
+ */
+ public static TransformExecutorService parallel(
+ ExecutorService executor, Map<TransformExecutor<?>, Boolean> scheduled) {
+ return new ParallelEvaluationState(executor, scheduled);
+ }
+
+ /**
+ * Returns an EvaluationState that evaluates {@link TransformExecutor TransformExecutors} in
+ * serial.
+ */
+ public static TransformExecutorService serial(
+ ExecutorService executor, Map<TransformExecutor<?>, Boolean> scheduled) {
+ return new SerialEvaluationState(executor, scheduled);
+ }
+
+ /**
+ * A {@link TransformExecutorService} with unlimited parallelism. Any {@link TransformExecutor}
+ * scheduled will be immediately submitted to the {@link ExecutorService}.
+ *
+ * <p>A principal use of this is for the evaluation of an unkeyed Step. Unkeyed computations are
+ * processed in parallel.
+ */
+ private static class ParallelEvaluationState implements TransformExecutorService {
+ private final ExecutorService executor;
+ private final Map<TransformExecutor<?>, Boolean> scheduled;
+
+ private ParallelEvaluationState(
+ ExecutorService executor, Map<TransformExecutor<?>, Boolean> scheduled) {
+ this.executor = executor;
+ this.scheduled = scheduled;
+ }
+
+ @Override
+ public void schedule(TransformExecutor<?> work) {
+ executor.submit(work);
+ scheduled.put(work, true);
+ }
+
+ @Override
+ public void complete(TransformExecutor<?> completed) {
+ scheduled.remove(completed);
+ }
+ }
+
+ /**
+ * A {@link TransformExecutorService} with a single work queue. Any {@link TransformExecutor}
+ * scheduled will be placed on the work queue. Only one item of work will be submitted to the
+ * {@link ExecutorService} at any time.
+ *
+ * <p>A principal use of this is for the serial evaluation of a (Step, Key) pair.
+ * Keyed computations are processed serially per step.
+ */
+ private static class SerialEvaluationState implements TransformExecutorService {
+ private final ExecutorService executor;
+ private final Map<TransformExecutor<?>, Boolean> scheduled;
+
+ private AtomicReference<TransformExecutor<?>> currentlyEvaluating;
+ private final Queue<TransformExecutor<?>> workQueue;
+
+ private SerialEvaluationState(
+ ExecutorService executor, Map<TransformExecutor<?>, Boolean> scheduled) {
+ this.scheduled = scheduled;
+ this.executor = executor;
+ this.currentlyEvaluating = new AtomicReference<>();
+ this.workQueue = new ConcurrentLinkedQueue<>();
+ }
+
+ /**
+ * Schedules the work, adding it to the work queue if there is a bundle currently being
+ * evaluated and scheduling it immediately otherwise.
+ */
+ @Override
+ public void schedule(TransformExecutor<?> work) {
+ workQueue.offer(work);
+ updateCurrentlyEvaluating();
+ }
+
+ @Override
+ public void complete(TransformExecutor<?> completed) {
+ if (!currentlyEvaluating.compareAndSet(completed, null)) {
+ throw new IllegalStateException(
+ "Finished work "
+ + completed
+ + " but could not complete due to unexpected currently executing "
+ + currentlyEvaluating.get());
+ }
+ scheduled.remove(completed);
+ updateCurrentlyEvaluating();
+ }
+
+ private void updateCurrentlyEvaluating() {
+ if (currentlyEvaluating.get() == null) {
+ // Only synchronize if we need to update what's currently evaluating
+ synchronized (this) {
+ TransformExecutor<?> newWork = workQueue.poll();
+ if (newWork != null) {
+ if (currentlyEvaluating.compareAndSet(null, newWork)) {
+ scheduled.put(newWork, true);
+ executor.submit(newWork);
+ } else {
+ workQueue.offer(newWork);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(SerialEvaluationState.class)
+ .add("currentlyEvaluating", currentlyEvaluating)
+ .add("workQueue", workQueue)
+ .toString();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
new file mode 100644
index 0000000..7a95c9f
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.sdk.io.Read.Unbounded;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
+import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+
+import javax.annotation.Nullable;
+
+/**
+ * A {@link TransformEvaluatorFactory} that produces {@link TransformEvaluator TransformEvaluators}
+ * for the {@link Unbounded Read.Unbounded} primitive {@link PTransform}.
+ */
+class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
+ /*
+ * An evaluator for a Source is stateful, to ensure the CheckpointMark is properly persisted.
+ * Evaluators are cached here to ensure that the checkpoint mark is appropriately reused
+ * and any splits are honored.
+ */
+ private final ConcurrentMap<EvaluatorKey, Queue<? extends UnboundedReadEvaluator<?>>>
+ sourceEvaluators = new ConcurrentHashMap<>();
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ @Override
+ public <InputT> TransformEvaluator<InputT> forApplication(AppliedPTransform<?, ?, ?> application,
+ @Nullable CommittedBundle<?> inputBundle, InProcessEvaluationContext evaluationContext) {
+ return getTransformEvaluator((AppliedPTransform) application, evaluationContext);
+ }
+
+ private <OutputT> TransformEvaluator<?> getTransformEvaluator(
+ final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform,
+ final InProcessEvaluationContext evaluationContext) {
+ UnboundedReadEvaluator<?> currentEvaluator =
+ getTransformEvaluatorQueue(transform, evaluationContext).poll();
+ if (currentEvaluator == null) {
+ return EmptyTransformEvaluator.create(transform);
+ }
+ return currentEvaluator;
+ }
+
+ /**
+ * Get the queue of {@link TransformEvaluator TransformEvaluators} that produce elements for the
+ * provided application of {@link Unbounded Read.Unbounded}, initializing it if required.
+ *
+ * <p>This method is thread-safe, and will only produce new evaluators if no other invocation has
+ * already done so.
+ */
+ @SuppressWarnings("unchecked")
+ private <OutputT> Queue<UnboundedReadEvaluator<OutputT>> getTransformEvaluatorQueue(
+ final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform,
+ final InProcessEvaluationContext evaluationContext) {
+ // Key by the application and the context the evaluation is occurring in (which call to
+ // Pipeline#run).
+ EvaluatorKey key = new EvaluatorKey(transform, evaluationContext);
+ @SuppressWarnings("unchecked")
+ Queue<UnboundedReadEvaluator<OutputT>> evaluatorQueue =
+ (Queue<UnboundedReadEvaluator<OutputT>>) sourceEvaluators.get(key);
+ if (evaluatorQueue == null) {
+ evaluatorQueue = new ConcurrentLinkedQueue<>();
+ if (sourceEvaluators.putIfAbsent(key, evaluatorQueue) == null) {
+ // If no queue existed in the evaluators, add an evaluator to initialize the evaluator
+ // factory for this transform
+ UnboundedSource<OutputT, ?> source = transform.getTransform().getSource();
+ UnboundedReadEvaluator<OutputT> evaluator =
+ new UnboundedReadEvaluator<OutputT>(
+ transform, evaluationContext, source, evaluatorQueue);
+ evaluatorQueue.offer(evaluator);
+ } else {
+ // otherwise return the existing Queue that arrived before us
+ evaluatorQueue = (Queue<UnboundedReadEvaluator<OutputT>>) sourceEvaluators.get(key);
+ }
+ }
+ return evaluatorQueue;
+ }
+
+ /**
+ * A {@link UnboundedReadEvaluator} produces elements from an underlying {@link UnboundedSource},
+ * discarding all input elements. Within the call to {@link #finishBundle()}, the evaluator
+ * creates the {@link UnboundedReader} and consumes some currently available input.
+ *
+ * <p>Calls to {@link UnboundedReadEvaluator} are not internally thread-safe, and should only be
+ * used by a single thread at a time. Each {@link UnboundedReadEvaluator} maintains its own
+ * checkpoint, and constructs its reader from the current checkpoint in each call to
+ * {@link #finishBundle()}.
+ */
+ private static class UnboundedReadEvaluator<OutputT> implements TransformEvaluator<Object> {
+ private static final int ARBITRARY_MAX_ELEMENTS = 10;
+ private final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform;
+ private final InProcessEvaluationContext evaluationContext;
+ private final Queue<UnboundedReadEvaluator<OutputT>> evaluatorQueue;
+ /**
+ * The source being read from by this {@link UnboundedReadEvaluator}. This may not be the same
+ * source as derived from {@link #transform} due to splitting.
+ */
+ private final UnboundedSource<OutputT, ?> source;
+ private CheckpointMark checkpointMark;
+
+ public UnboundedReadEvaluator(
+ AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform,
+ InProcessEvaluationContext evaluationContext,
+ UnboundedSource<OutputT, ?> source,
+ Queue<UnboundedReadEvaluator<OutputT>> evaluatorQueue) {
+ this.transform = transform;
+ this.evaluationContext = evaluationContext;
+ this.evaluatorQueue = evaluatorQueue;
+ this.source = source;
+ this.checkpointMark = null;
+ }
+
+ @Override
+ public void processElement(WindowedValue<Object> element) {}
+
+ @Override
+ public InProcessTransformResult finishBundle() throws IOException {
+ UncommittedBundle<OutputT> output = evaluationContext.createRootBundle(transform.getOutput());
+ try (UnboundedReader<OutputT> reader =
+ createReader(source, evaluationContext.getPipelineOptions());) {
+ int numElements = 0;
+ if (reader.start()) {
+ do {
+ output.add(
+ WindowedValue.timestampedValueInGlobalWindow(
+ reader.getCurrent(), reader.getCurrentTimestamp()));
+ numElements++;
+ } while (numElements < ARBITRARY_MAX_ELEMENTS && reader.advance());
+ }
+ checkpointMark = reader.getCheckpointMark();
+ checkpointMark.finalizeCheckpoint();
+ // TODO: When exercising create initial splits, make this the minimum watermark across all
+ // existing readers
+ StepTransformResult result =
+ StepTransformResult.withHold(transform, reader.getWatermark())
+ .addOutput(output)
+ .build();
+ evaluatorQueue.offer(this);
+ return result;
+ }
+ }
+
+ private <CheckpointMarkT extends CheckpointMark> UnboundedReader<OutputT> createReader(
+ UnboundedSource<OutputT, CheckpointMarkT> source, PipelineOptions options) {
+ @SuppressWarnings("unchecked")
+ CheckpointMarkT mark = (CheckpointMarkT) checkpointMark;
+ return source.createReader(options, mark);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
new file mode 100644
index 0000000..ffaf3fa
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.InProcessPipelineRunner.PCollectionViewWriter;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the
+ * {@link CreatePCollectionView} primitive {@link PTransform}.
+ *
+ * <p>The {@link ViewEvaluatorFactory} produces {@link TransformEvaluator TransformEvaluators} for
+ * the {@link WriteView} {@link PTransform}, which is part of the
+ * {@link InProcessCreatePCollectionView} composite transform. This transform is an override for the
+ * {@link CreatePCollectionView} transform that applies windowing and triggers before the view is
+ * written.
+ */
+class ViewEvaluatorFactory implements TransformEvaluatorFactory {
+ @Override
+ public <T> TransformEvaluator<T> forApplication(
+ AppliedPTransform<?, ?, ?> application,
+ InProcessPipelineRunner.CommittedBundle<?> inputBundle,
+ InProcessEvaluationContext evaluationContext) {
+ @SuppressWarnings({"cast", "unchecked", "rawtypes"})
+ TransformEvaluator<T> evaluator = createEvaluator(
+ (AppliedPTransform) application, evaluationContext);
+ return evaluator;
+ }
+
+ private <InT, OuT> TransformEvaluator<Iterable<InT>> createEvaluator(
+ final AppliedPTransform<PCollection<Iterable<InT>>, PCollectionView<OuT>, WriteView<InT, OuT>>
+ application,
+ InProcessEvaluationContext context) {
+ PCollection<Iterable<InT>> input = application.getInput();
+ final PCollectionViewWriter<InT, OuT> writer =
+ context.createPCollectionViewWriter(input, application.getOutput());
+ return new TransformEvaluator<Iterable<InT>>() {
+ private final List<WindowedValue<InT>> elements = new ArrayList<>();
+
+ @Override
+ public void processElement(WindowedValue<Iterable<InT>> element) {
+ for (InT input : element.getValue()) {
+ elements.add(element.withValue(input));
+ }
+ }
+
+ @Override
+ public InProcessTransformResult finishBundle() {
+ writer.add(elements);
+ return StepTransformResult.withoutHold(application).build();
+ }
+ };
+ }
+
+ public static class InProcessViewOverrideFactory implements PTransformOverrideFactory {
+ @Override
+ public <InputT extends PInput, OutputT extends POutput>
+ PTransform<InputT, OutputT> override(PTransform<InputT, OutputT> transform) {
+ if (transform instanceof CreatePCollectionView) {
+
+ }
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ PTransform<InputT, OutputT> createView =
+ (PTransform<InputT, OutputT>)
+ new InProcessCreatePCollectionView<>((CreatePCollectionView) transform);
+ return createView;
+ }
+ }
+
+ /**
+ * An in-process override for {@link CreatePCollectionView}.
+ */
+ private static class InProcessCreatePCollectionView<ElemT, ViewT>
+ extends ForwardingPTransform<PCollection<ElemT>, PCollectionView<ViewT>> {
+ private final CreatePCollectionView<ElemT, ViewT> og;
+
+ private InProcessCreatePCollectionView(CreatePCollectionView<ElemT, ViewT> og) {
+ this.og = og;
+ }
+
+ @Override
+ public PCollectionView<ViewT> apply(PCollection<ElemT> input) {
+ return input.apply(WithKeys.<Void, ElemT>of((Void) null))
+ .setCoder(KvCoder.of(VoidCoder.of(), input.getCoder()))
+ .apply(GroupByKey.<Void, ElemT>create())
+ .apply(Values.<Iterable<ElemT>>create())
+ .apply(new WriteView<ElemT, ViewT>(og));
+ }
+
+ @Override
+ protected PTransform<PCollection<ElemT>, PCollectionView<ViewT>> delegate() {
+ return og;
+ }
+ }
+
+ /**
+ * An in-process implementation of the {@link CreatePCollectionView} primitive.
+ *
+ * This implementation requires the input {@link PCollection} to be an iterable, which is provided
+ * to {@link PCollectionView#fromIterableInternal(Iterable)}.
+ */
+ public static final class WriteView<ElemT, ViewT>
+ extends PTransform<PCollection<Iterable<ElemT>>, PCollectionView<ViewT>> {
+ private final CreatePCollectionView<ElemT, ViewT> og;
+
+ WriteView(CreatePCollectionView<ElemT, ViewT> og) {
+ this.og = og;
+ }
+
+ @Override
+ public PCollectionView<ViewT> apply(PCollection<Iterable<ElemT>> input) {
+ return og.getView();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java
new file mode 100644
index 0000000..4a3a517
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowingStrategy;
+
+import com.google.common.collect.ComparisonChain;
+import com.google.common.collect.Ordering;
+
+import org.joda.time.Instant;
+
+import java.util.PriorityQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Executes callbacks that occur based on the progression of the watermark per-step.
+ *
+ * <p>Callbacks are registered by calls to
+ * {@link #callOnGuaranteedFiring(AppliedPTransform, BoundedWindow, WindowingStrategy, Runnable)},
+ * and are executed after a call to {@link #fireForWatermark(AppliedPTransform, Instant)} with the
+ * same {@link AppliedPTransform} and a watermark sufficient to ensure that the trigger for the
+ * windowing strategy would have been produced.
+ *
+ * <p>NOTE: {@link WatermarkCallbackExecutor} does not track the latest observed watermark for any
+ * {@link AppliedPTransform} - any call to
+ * {@link #callOnGuaranteedFiring(AppliedPTransform, BoundedWindow, WindowingStrategy, Runnable)}
+ * that could have potentially already fired should be followed by a call to
+ * {@link #fireForWatermark(AppliedPTransform, Instant)} for the same transform with the current
+ * value of the watermark.
+ */
+class WatermarkCallbackExecutor {
+ /**
+ * Create a new {@link WatermarkCallbackExecutor}.
+ */
+ public static WatermarkCallbackExecutor create() {
+ return new WatermarkCallbackExecutor();
+ }
+
+ private final ConcurrentMap<AppliedPTransform<?, ?, ?>, PriorityQueue<WatermarkCallback>>
+ callbacks;
+ private final ExecutorService executor;
+
+ private WatermarkCallbackExecutor() {
+ this.callbacks = new ConcurrentHashMap<>();
+ this.executor = Executors.newSingleThreadExecutor();
+ }
+
+ /**
+ * Execute the provided {@link Runnable} after the next call to
+ * {@link #fireForWatermark(AppliedPTransform, Instant)} where the window is guaranteed to have
+ * produced output.
+ */
+ public void callOnGuaranteedFiring(
+ AppliedPTransform<?, ?, ?> step,
+ BoundedWindow window,
+ WindowingStrategy<?, ?> windowingStrategy,
+ Runnable runnable) {
+ WatermarkCallback callback =
+ WatermarkCallback.onGuaranteedFiring(window, windowingStrategy, runnable);
+
+ PriorityQueue<WatermarkCallback> callbackQueue = callbacks.get(step);
+ if (callbackQueue == null) {
+ callbackQueue = new PriorityQueue<>(11, new CallbackOrdering());
+ if (callbacks.putIfAbsent(step, callbackQueue) != null) {
+ callbackQueue = callbacks.get(step);
+ }
+ }
+
+ synchronized (callbackQueue) {
+ callbackQueue.offer(callback);
+ }
+ }
+
+ /**
+ * Schedule all pending callbacks that must have produced output by the time of the provided
+ * watermark.
+ */
+ public void fireForWatermark(AppliedPTransform<?, ?, ?> step, Instant watermark) {
+ PriorityQueue<WatermarkCallback> callbackQueue = callbacks.get(step);
+ if (callbackQueue == null) {
+ return;
+ }
+ synchronized (callbackQueue) {
+ while (!callbackQueue.isEmpty() && callbackQueue.peek().shouldFire(watermark)) {
+ executor.submit(callbackQueue.poll().getCallback());
+ }
+ }
+ }
+
+ private static class WatermarkCallback {
+ public static <W extends BoundedWindow> WatermarkCallback onGuaranteedFiring(
+ BoundedWindow window, WindowingStrategy<?, W> strategy, Runnable callback) {
+ @SuppressWarnings("unchecked")
+ Instant firingAfter =
+ strategy.getTrigger().getSpec().getWatermarkThatGuaranteesFiring((W) window);
+ return new WatermarkCallback(firingAfter, callback);
+ }
+
+ private final Instant fireAfter;
+ private final Runnable callback;
+
+ private WatermarkCallback(Instant fireAfter, Runnable callback) {
+ this.fireAfter = fireAfter;
+ this.callback = callback;
+ }
+
+ public boolean shouldFire(Instant currentWatermark) {
+ return currentWatermark.isAfter(fireAfter)
+ || currentWatermark.equals(BoundedWindow.TIMESTAMP_MAX_VALUE);
+ }
+
+ public Runnable getCallback() {
+ return callback;
+ }
+ }
+
+ private static class CallbackOrdering extends Ordering<WatermarkCallback> {
+ @Override
+ public int compare(WatermarkCallback left, WatermarkCallback right) {
+ return ComparisonChain.start()
+ .compare(left.fireAfter, right.fireAfter)
+ .compare(left.callback, right.callback, Ordering.arbitrary())
+ .result();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
new file mode 100644
index 0000000..628f94d
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.Window.Bound;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+
+import org.joda.time.Instant;
+
+import java.util.Collection;
+
+import javax.annotation.Nullable;
+
+/**
+ * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the
+ * {@link Bound Window.Bound} primitive {@link PTransform}.
+ */
+class WindowEvaluatorFactory implements TransformEvaluatorFactory {
+
+ @Override
+ public <InputT> TransformEvaluator<InputT> forApplication(
+ AppliedPTransform<?, ?, ?> application,
+ @Nullable CommittedBundle<?> inputBundle,
+ InProcessEvaluationContext evaluationContext)
+ throws Exception {
+ return createTransformEvaluator(
+ (AppliedPTransform) application, inputBundle, evaluationContext);
+ }
+
+ private <InputT> TransformEvaluator<InputT> createTransformEvaluator(
+ AppliedPTransform<PCollection<InputT>, PCollection<InputT>, Window.Bound<InputT>> transform,
+ CommittedBundle<?> inputBundle,
+ InProcessEvaluationContext evaluationContext) {
+ WindowFn<? super InputT, ?> fn = transform.getTransform().getWindowFn();
+ UncommittedBundle<InputT> outputBundle =
+ evaluationContext.createBundle(inputBundle, transform.getOutput());
+ if (fn == null) {
+ return PassthroughTransformEvaluator.create(transform, outputBundle);
+ }
+ return new WindowIntoEvaluator<>(transform, fn, outputBundle);
+ }
+
+ private static class WindowIntoEvaluator<InputT> implements TransformEvaluator<InputT> {
+ private final AppliedPTransform<PCollection<InputT>, PCollection<InputT>, Window.Bound<InputT>>
+ transform;
+ private final WindowFn<InputT, ?> windowFn;
+ private final UncommittedBundle<InputT> outputBundle;
+
+ @SuppressWarnings("unchecked")
+ public WindowIntoEvaluator(
+ AppliedPTransform<PCollection<InputT>, PCollection<InputT>, Window.Bound<InputT>> transform,
+ WindowFn<? super InputT, ?> windowFn,
+ UncommittedBundle<InputT> outputBundle) {
+ this.outputBundle = outputBundle;
+ this.transform = transform;
+ // Safe contravariant cast
+ this.windowFn = (WindowFn<InputT, ?>) windowFn;
+ }
+
+ @Override
+ public void processElement(WindowedValue<InputT> element) throws Exception {
+ Collection<? extends BoundedWindow> windows = assignWindows(windowFn, element);
+ outputBundle.add(
+ WindowedValue.<InputT>of(
+ element.getValue(), element.getTimestamp(), windows, PaneInfo.NO_FIRING));
+ }
+
+ private <W extends BoundedWindow> Collection<? extends BoundedWindow> assignWindows(
+ WindowFn<InputT, W> windowFn, WindowedValue<InputT> element) throws Exception {
+ WindowFn<InputT, W>.AssignContext assignContext =
+ new InProcessAssignContext<>(windowFn, element);
+ Collection<? extends BoundedWindow> windows = windowFn.assignWindows(assignContext);
+ return windows;
+ }
+
+ @Override
+ public InProcessTransformResult finishBundle() throws Exception {
+ return StepTransformResult.withoutHold(transform).addOutput(outputBundle).build();
+ }
+ }
+
+ private static class InProcessAssignContext<InputT, W extends BoundedWindow>
+ extends WindowFn<InputT, W>.AssignContext {
+ private final WindowedValue<InputT> value;
+
+ public InProcessAssignContext(WindowFn<InputT, W> fn, WindowedValue<InputT> value) {
+ fn.super();
+ this.value = value;
+ }
+
+ @Override
+ public InputT element() {
+ return value.getValue();
+ }
+
+ @Override
+ public Instant timestamp() {
+ return value.getTimestamp();
+ }
+
+ @Override
+ public Collection<? extends BoundedWindow> windows() {
+ return value.getWindows();
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactoryTest.java
new file mode 100644
index 0000000..d290a4b
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactoryTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.theInstance;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.sdk.io.AvroIO;
+import org.apache.beam.sdk.io.AvroIOTest;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+
+import org.hamcrest.Matchers;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.File;
+
+/**
+ * Tests for {@link AvroIOShardedWriteFactory}.
+ */
+@RunWith(JUnit4.class)
+public class AvroIOShardedWriteFactoryTest {
+
+ @Rule public TemporaryFolder tmp = new TemporaryFolder();
+ private AvroIOShardedWriteFactory factory;
+
+ @Before
+ public void setup() {
+ factory = new AvroIOShardedWriteFactory();
+ }
+
+ @Test
+ public void originalWithoutShardingReturnsOriginal() throws Exception {
+ File file = tmp.newFile("foo");
+ PTransform<PCollection<String>, PDone> original =
+ AvroIO.Write.withSchema(String.class).to(file.getAbsolutePath()).withoutSharding();
+ PTransform<PCollection<String>, PDone> overridden = factory.override(original);
+
+ assertThat(overridden, theInstance(original));
+ }
+
+ @Test
+ public void originalShardingNotSpecifiedReturnsOriginal() throws Exception {
+ File file = tmp.newFile("foo");
+ PTransform<PCollection<String>, PDone> original =
+ AvroIO.Write.withSchema(String.class).to(file.getAbsolutePath());
+ PTransform<PCollection<String>, PDone> overridden = factory.override(original);
+
+ assertThat(overridden, theInstance(original));
+ }
+
+ @Test
+ public void originalShardedToOneReturnsExplicitlySharded() throws Exception {
+ File file = tmp.newFile("foo");
+ AvroIO.Write.Bound<String> original =
+ AvroIO.Write.withSchema(String.class).to(file.getAbsolutePath()).withNumShards(1);
+ PTransform<PCollection<String>, PDone> overridden = factory.override(original);
+
+ assertThat(overridden, not(Matchers.<PTransform<PCollection<String>, PDone>>equalTo(original)));
+
+ TestPipeline p = TestPipeline.create();
+ String[] elems = new String[] {"foo", "bar", "baz"};
+ p.apply(Create.<String>of(elems)).apply(overridden);
+
+ file.delete();
+
+ p.run();
+ AvroIOTest.assertTestOutputs(elems, 1, file.getAbsolutePath(), original.getShardNameTemplate());
+ }
+
+ @Test
+ public void originalShardedToManyReturnsExplicitlySharded() throws Exception {
+ File file = tmp.newFile("foo");
+ AvroIO.Write.Bound<String> original =
+ AvroIO.Write.withSchema(String.class).to(file.getAbsolutePath()).withNumShards(3);
+ PTransform<PCollection<String>, PDone> overridden = factory.override(original);
+
+ assertThat(overridden, not(Matchers.<PTransform<PCollection<String>, PDone>>equalTo(original)));
+
+ TestPipeline p = TestPipeline.create();
+ String[] elems = new String[] {"foo", "bar", "baz", "spam", "ham", "eggs"};
+ p.apply(Create.<String>of(elems)).apply(overridden);
+
+ file.delete();
+ p.run();
+ AvroIOTest.assertTestOutputs(elems, 3, file.getAbsolutePath(), original.getShardNameTemplate());
+ }
+}