You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/04/29 23:56:04 UTC
[06/17] incubator-beam git commit: Move InProcessRunner to its own
module
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineOptions.java
deleted file mode 100644
index bdc525a..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineOptions.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.options.ApplicationNameOptions;
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.Hidden;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.Validation.Required;
-import org.apache.beam.sdk.transforms.PTransform;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-/**
- * Options that can be used to configure the {@link InProcessPipelineRunner}.
- */
-public interface InProcessPipelineOptions extends PipelineOptions, ApplicationNameOptions {
- /**
- * Gets the {@link ExecutorServiceFactory} to use to create instances of {@link ExecutorService}
- * to execute {@link PTransform PTransforms}.
- *
- * <p>Note that {@link ExecutorService ExecutorServices} returned by the factory must ensure that
- * it cannot enter a state in which it will not schedule additional pending work unless currently
- * scheduled work completes, as this may cause the {@link Pipeline} to cease processing.
- *
- * <p>Defaults to a {@link CachedThreadPoolExecutorServiceFactory}, which produces instances of
- * {@link Executors#newCachedThreadPool()}.
- */
- @JsonIgnore
- @Required
- @Hidden
- @Default.InstanceFactory(CachedThreadPoolExecutorServiceFactory.class)
- ExecutorServiceFactory getExecutorServiceFactory();
-
- void setExecutorServiceFactory(ExecutorServiceFactory executorService);
-
- /**
- * Gets the {@link Clock} used by this pipeline. The clock is used in place of accessing the
- * system time when time values are required by the evaluator.
- */
- @Default.InstanceFactory(NanosOffsetClock.Factory.class)
- @JsonIgnore
- @Required
- @Hidden
- @Description(
- "The processing time source used by the pipeline. When the current time is "
- + "needed by the evaluator, the result of clock#now() is used.")
- Clock getClock();
-
- void setClock(Clock clock);
-
- @Default.Boolean(false)
- @Description(
- "If the pipeline should shut down producers which have reached the maximum "
- + "representable watermark. If this is set to true, a pipeline in which all PTransforms "
- + "have reached the maximum watermark will be shut down, even if there are unbounded "
- + "sources that could produce additional (late) data. By default, if the pipeline "
- + "contains any unbounded PCollections, it will run until explicitly shut down.")
- boolean isShutdownUnboundedProducersWithMaxWatermark();
-
- void setShutdownUnboundedProducersWithMaxWatermark(boolean shutdown);
-
- @Default.Boolean(true)
- @Description(
- "If the pipeline should block awaiting completion of the pipeline. If set to true, "
- + "a call to Pipeline#run() will block until all PTransforms are complete. Otherwise, "
- + "the Pipeline will execute asynchronously. If set to false, the completion of the "
- + "pipeline can be awaited on by use of InProcessPipelineResult#awaitCompletion().")
- boolean isBlockOnRun();
-
- void setBlockOnRun(boolean b);
-
- @Default.Boolean(true)
- @Description(
- "Controls whether the runner should ensure that all of the elements of every "
- + "PCollection are not mutated. PTransforms are not permitted to mutate input elements "
- + "at any point, or output elements after they are output.")
- boolean isTestImmutability();
-
- void setTestImmutability(boolean test);
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRunner.java
deleted file mode 100644
index 7897f2e..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRunner.java
+++ /dev/null
@@ -1,370 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.io.AvroIO;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.runners.AggregatorPipelineExtractor;
-import org.apache.beam.sdk.runners.AggregatorRetrievalException;
-import org.apache.beam.sdk.runners.AggregatorValues;
-import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.runners.inprocess.GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly;
-import org.apache.beam.sdk.runners.inprocess.GroupByKeyEvaluatorFactory.InProcessGroupByKeyOverrideFactory;
-import org.apache.beam.sdk.runners.inprocess.ViewEvaluatorFactory.InProcessViewOverrideFactory;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
-import org.apache.beam.sdk.util.MapAggregatorValues;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
-import org.apache.beam.sdk.util.UserCodeException;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.common.Counter;
-import org.apache.beam.sdk.util.common.CounterSet;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollection.IsBounded;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-import org.apache.beam.sdk.values.PValue;
-
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-
-import org.joda.time.Instant;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-
-import javax.annotation.Nullable;
-
-/**
- * An In-Memory implementation of the Dataflow Programming Model. Supports Unbounded
- * {@link PCollection PCollections}.
- */
-@Experimental
-public class InProcessPipelineRunner
- extends PipelineRunner<InProcessPipelineRunner.InProcessPipelineResult> {
- /**
- * The default set of transform overrides to use in the {@link InProcessPipelineRunner}.
- *
- * <p>A transform override must have a single-argument constructor that takes an instance of the
- * type of transform it is overriding.
- */
- @SuppressWarnings("rawtypes")
- private static Map<Class<? extends PTransform>, PTransformOverrideFactory>
- defaultTransformOverrides =
- ImmutableMap.<Class<? extends PTransform>, PTransformOverrideFactory>builder()
- .put(GroupByKey.class, new InProcessGroupByKeyOverrideFactory())
- .put(CreatePCollectionView.class, new InProcessViewOverrideFactory())
- .put(AvroIO.Write.Bound.class, new AvroIOShardedWriteFactory())
- .put(TextIO.Write.Bound.class, new TextIOShardedWriteFactory())
- .build();
-
- /**
- * Part of a {@link PCollection}. Elements are output to a bundle, which will cause them to be
- * executed by {@link PTransform PTransforms} that consume the {@link PCollection} this bundle is
- * a part of at a later point. This is an uncommitted bundle and can have elements added to it.
- *
- * @param <T> the type of elements that can be added to this bundle
- */
- public static interface UncommittedBundle<T> {
- /**
- * Returns the PCollection that the elements of this {@link UncommittedBundle} belong to.
- */
- PCollection<T> getPCollection();
-
- /**
- * Outputs an element to this bundle.
- *
- * @param element the element to add to this bundle
- * @return this bundle
- */
- UncommittedBundle<T> add(WindowedValue<T> element);
-
- /**
- * Commits this {@link UncommittedBundle}, returning an immutable {@link CommittedBundle}
- * containing all of the elements that were added to it. The {@link #add(WindowedValue)} method
- * will throw an {@link IllegalStateException} if called after a call to commit.
- * @param synchronizedProcessingTime the synchronized processing time at which this bundle was
- * committed
- */
- CommittedBundle<T> commit(Instant synchronizedProcessingTime);
- }
-
- /**
- * Part of a {@link PCollection}. Elements are output to an {@link UncommittedBundle}, which will
- * eventually committed. Committed elements are executed by the {@link PTransform PTransforms}
- * that consume the {@link PCollection} this bundle is
- * a part of at a later point.
- * @param <T> the type of elements contained within this bundle
- */
- public static interface CommittedBundle<T> {
- /**
- * Returns the PCollection that the elements of this bundle belong to.
- */
- PCollection<T> getPCollection();
-
- /**
- * Returns the (possibly null) key that was output in the most recent {@link GroupByKey} in the
- * execution of this bundle.
- */
- @Nullable
- Object getKey();
-
- /**
- * Returns an {@link Iterable} containing all of the elements that have been added to this
- * {@link CommittedBundle}.
- */
- Iterable<WindowedValue<T>> getElements();
-
- /**
- * Returns the processing time output watermark at the time the producing {@link PTransform}
- * committed this bundle. Downstream synchronized processing time watermarks cannot progress
- * past this point before consuming this bundle.
- *
- * <p>This value is no greater than the earliest incomplete processing time or synchronized
- * processing time {@link TimerData timer} at the time this bundle was committed, including any
- * timers that fired to produce this bundle.
- */
- Instant getSynchronizedProcessingOutputWatermark();
-
- /**
- * Return a new {@link CommittedBundle} that is like this one, except calls to
- * {@link #getElements()} will return the provided elements. This bundle is unchanged.
- *
- * <p>
- * The value of the {@link #getSynchronizedProcessingOutputWatermark() synchronized processing
- * output watermark} of the returned {@link CommittedBundle} is equal to the value returned from
- * the current bundle. This is used to ensure a {@link PTransform} that could not complete
- * processing on input elements properly holds the synchronized processing time to the
- * appropriate value.
- */
- CommittedBundle<T> withElements(Iterable<WindowedValue<T>> elements);
- }
-
- /**
- * A {@link PCollectionViewWriter} is responsible for writing contents of a {@link PCollection} to
- * a storage mechanism that can be read from while constructing a {@link PCollectionView}.
- * @param <ElemT> the type of elements the input {@link PCollection} contains.
- * @param <ViewT> the type of the PCollectionView this writer writes to.
- */
- public static interface PCollectionViewWriter<ElemT, ViewT> {
- void add(Iterable<WindowedValue<ElemT>> values);
- }
-
- ////////////////////////////////////////////////////////////////////////////////////////////////
- private final InProcessPipelineOptions options;
-
- public static InProcessPipelineRunner fromOptions(PipelineOptions options) {
- return new InProcessPipelineRunner(options.as(InProcessPipelineOptions.class));
- }
-
- private InProcessPipelineRunner(InProcessPipelineOptions options) {
- this.options = options;
- }
-
- /**
- * Returns the {@link PipelineOptions} used to create this {@link InProcessPipelineRunner}.
- */
- public InProcessPipelineOptions getPipelineOptions() {
- return options;
- }
-
- @Override
- public <OutputT extends POutput, InputT extends PInput> OutputT apply(
- PTransform<InputT, OutputT> transform, InputT input) {
- PTransformOverrideFactory overrideFactory = defaultTransformOverrides.get(transform.getClass());
- if (overrideFactory != null) {
- PTransform<InputT, OutputT> customTransform = overrideFactory.override(transform);
-
- return super.apply(customTransform, input);
- }
- // If there is no override, or we should not apply the override, apply the original transform
- return super.apply(transform, input);
- }
-
- @Override
- public InProcessPipelineResult run(Pipeline pipeline) {
- ConsumerTrackingPipelineVisitor consumerTrackingVisitor = new ConsumerTrackingPipelineVisitor();
- pipeline.traverseTopologically(consumerTrackingVisitor);
- for (PValue unfinalized : consumerTrackingVisitor.getUnfinalizedPValues()) {
- unfinalized.finishSpecifying();
- }
- @SuppressWarnings("rawtypes")
- KeyedPValueTrackingVisitor keyedPValueVisitor =
- KeyedPValueTrackingVisitor.create(
- ImmutableSet.<Class<? extends PTransform>>of(
- GroupByKey.class, InProcessGroupByKeyOnly.class));
- pipeline.traverseTopologically(keyedPValueVisitor);
-
- InProcessEvaluationContext context =
- InProcessEvaluationContext.create(
- getPipelineOptions(),
- createBundleFactory(getPipelineOptions()),
- consumerTrackingVisitor.getRootTransforms(),
- consumerTrackingVisitor.getValueToConsumers(),
- consumerTrackingVisitor.getStepNames(),
- consumerTrackingVisitor.getViews());
-
- // independent executor service for each run
- ExecutorService executorService =
- context.getPipelineOptions().getExecutorServiceFactory().create();
- InProcessExecutor executor =
- ExecutorServiceParallelExecutor.create(
- executorService,
- consumerTrackingVisitor.getValueToConsumers(),
- keyedPValueVisitor.getKeyedPValues(),
- TransformEvaluatorRegistry.defaultRegistry(),
- defaultModelEnforcements(options),
- context);
- executor.start(consumerTrackingVisitor.getRootTransforms());
-
- Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps =
- new AggregatorPipelineExtractor(pipeline).getAggregatorSteps();
- InProcessPipelineResult result =
- new InProcessPipelineResult(executor, context, aggregatorSteps);
- if (options.isBlockOnRun()) {
- try {
- result.awaitCompletion();
- } catch (UserCodeException userException) {
- throw new PipelineExecutionException(userException.getCause());
- } catch (Throwable t) {
- Throwables.propagate(t);
- }
- }
- return result;
- }
-
- private Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>>
- defaultModelEnforcements(InProcessPipelineOptions options) {
- ImmutableMap.Builder<Class<? extends PTransform>, Collection<ModelEnforcementFactory>>
- enforcements = ImmutableMap.builder();
- Collection<ModelEnforcementFactory> parDoEnforcements = createParDoEnforcements(options);
- enforcements.put(ParDo.Bound.class, parDoEnforcements);
- enforcements.put(ParDo.BoundMulti.class, parDoEnforcements);
- return enforcements.build();
- }
-
- private Collection<ModelEnforcementFactory> createParDoEnforcements(
- InProcessPipelineOptions options) {
- ImmutableList.Builder<ModelEnforcementFactory> enforcements = ImmutableList.builder();
- if (options.isTestImmutability()) {
- enforcements.add(ImmutabilityEnforcementFactory.create());
- }
- return enforcements.build();
- }
-
- private BundleFactory createBundleFactory(InProcessPipelineOptions pipelineOptions) {
- BundleFactory bundleFactory = InProcessBundleFactory.create();
- if (pipelineOptions.isTestImmutability()) {
- bundleFactory = ImmutabilityCheckingBundleFactory.create(bundleFactory);
- }
- return bundleFactory;
- }
-
- /**
- * The result of running a {@link Pipeline} with the {@link InProcessPipelineRunner}.
- *
- * Throws {@link UnsupportedOperationException} for all methods.
- */
- public static class InProcessPipelineResult implements PipelineResult {
- private final InProcessExecutor executor;
- private final InProcessEvaluationContext evaluationContext;
- private final Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps;
- private State state;
-
- private InProcessPipelineResult(
- InProcessExecutor executor,
- InProcessEvaluationContext evaluationContext,
- Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps) {
- this.executor = executor;
- this.evaluationContext = evaluationContext;
- this.aggregatorSteps = aggregatorSteps;
- // Only ever constructed after the executor has started.
- this.state = State.RUNNING;
- }
-
- @Override
- public State getState() {
- return state;
- }
-
- @Override
- public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator)
- throws AggregatorRetrievalException {
- CounterSet counters = evaluationContext.getCounters();
- Collection<PTransform<?, ?>> steps = aggregatorSteps.get(aggregator);
- Map<String, T> stepValues = new HashMap<>();
- for (AppliedPTransform<?, ?, ?> transform : evaluationContext.getSteps()) {
- if (steps.contains(transform.getTransform())) {
- String stepName =
- String.format(
- "user-%s-%s", evaluationContext.getStepName(transform), aggregator.getName());
- Counter<T> counter = (Counter<T>) counters.getExistingCounter(stepName);
- if (counter != null) {
- stepValues.put(transform.getFullName(), counter.getAggregate());
- }
- }
- }
- return new MapAggregatorValues<>(stepValues);
- }
-
- /**
- * Blocks until the {@link Pipeline} execution represented by this
- * {@link InProcessPipelineResult} is complete, returning the terminal state.
- *
- * <p>If the pipeline terminates abnormally by throwing an exception, this will rethrow the
- * exception. Future calls to {@link #getState()} will return
- * {@link org.apache.beam.sdk.PipelineResult.State#FAILED}.
- *
- * <p>NOTE: if the {@link Pipeline} contains an {@link IsBounded#UNBOUNDED unbounded}
- * {@link PCollection}, and the {@link PipelineRunner} was created with
- * {@link InProcessPipelineOptions#isShutdownUnboundedProducersWithMaxWatermark()} set to false,
- * this method will never return.
- *
- * See also {@link InProcessExecutor#awaitCompletion()}.
- */
- public State awaitCompletion() throws Throwable {
- if (!state.isTerminal()) {
- try {
- executor.awaitCompletion();
- state = State.DONE;
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw e;
- } catch (Throwable t) {
- state = State.FAILED;
- throw t;
- }
- }
- return state;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessRegistrar.java
deleted file mode 100644
index 8d29d01..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessRegistrar.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
-import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
-
-import com.google.auto.service.AutoService;
-import com.google.common.collect.ImmutableList;
-
-/**
- * Contains the {@link PipelineRunnerRegistrar} and {@link PipelineOptionsRegistrar} for the
- * {@link InProcessPipelineRunner}.
- */
-public class InProcessRegistrar {
- private InProcessRegistrar() {}
- /**
- * Registers the {@link InProcessPipelineRunner}.
- */
- @AutoService(PipelineRunnerRegistrar.class)
- public static class InProcessRunner implements PipelineRunnerRegistrar {
- @Override
- public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
- return ImmutableList.<Class<? extends PipelineRunner<?>>>of(InProcessPipelineRunner.class);
- }
- }
-
- /**
- * Registers the {@link InProcessPipelineOptions}.
- */
- @AutoService(PipelineOptionsRegistrar.class)
- public static class InProcessOptions implements PipelineOptionsRegistrar {
- @Override
- public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
- return ImmutableList.<Class<? extends PipelineOptions>>of(InProcessPipelineOptions.class);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainer.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainer.java
deleted file mode 100644
index fda78fc..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainer.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import org.apache.beam.sdk.runners.inprocess.InProcessEvaluationContext.ReadyCheckingSideInputReader;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.PCollectionViewWindow;
-import org.apache.beam.sdk.util.SideInputReader;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.PCollectionView;
-
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Throwables;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.SettableFuture;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-
-import javax.annotation.Nullable;
-
-/**
- * An in-process container for {@link PCollectionView PCollectionViews}, which provides methods for
- * constructing {@link SideInputReader SideInputReaders} which block until a side input is
- * available and writing to a {@link PCollectionView}.
- */
-class InProcessSideInputContainer {
- private final InProcessEvaluationContext evaluationContext;
- private final Collection<PCollectionView<?>> containedViews;
- private final LoadingCache<PCollectionViewWindow<?>,
- SettableFuture<Iterable<? extends WindowedValue<?>>>> viewByWindows;
-
- /**
- * Create a new {@link InProcessSideInputContainer} with the provided views and the provided
- * context.
- */
- public static InProcessSideInputContainer create(
- InProcessEvaluationContext context, Collection<PCollectionView<?>> containedViews) {
- CacheLoader<PCollectionViewWindow<?>, SettableFuture<Iterable<? extends WindowedValue<?>>>>
- loader = new CacheLoader<PCollectionViewWindow<?>,
- SettableFuture<Iterable<? extends WindowedValue<?>>>>() {
- @Override
- public SettableFuture<Iterable<? extends WindowedValue<?>>> load(
- PCollectionViewWindow<?> view) {
- return SettableFuture.create();
- }
- };
- LoadingCache<PCollectionViewWindow<?>, SettableFuture<Iterable<? extends WindowedValue<?>>>>
- viewByWindows = CacheBuilder.newBuilder().build(loader);
- return new InProcessSideInputContainer(context, containedViews, viewByWindows);
- }
-
- private InProcessSideInputContainer(InProcessEvaluationContext context,
- Collection<PCollectionView<?>> containedViews,
- LoadingCache<PCollectionViewWindow<?>, SettableFuture<Iterable<? extends WindowedValue<?>>>>
- viewByWindows) {
- this.evaluationContext = context;
- this.containedViews = ImmutableSet.copyOf(containedViews);
- this.viewByWindows = viewByWindows;
- }
-
- /**
- * Return a view of this {@link InProcessSideInputContainer} that contains only the views in the
- * provided argument. The returned {@link InProcessSideInputContainer} is unmodifiable without
- * casting, but will change as this {@link InProcessSideInputContainer} is modified.
- */
- public ReadyCheckingSideInputReader createReaderForViews(
- Collection<PCollectionView<?>> newContainedViews) {
- if (!containedViews.containsAll(newContainedViews)) {
- Set<PCollectionView<?>> currentlyContained = ImmutableSet.copyOf(containedViews);
- Set<PCollectionView<?>> newRequested = ImmutableSet.copyOf(newContainedViews);
- throw new IllegalArgumentException("Can't create a SideInputReader with unknown views "
- + Sets.difference(newRequested, currentlyContained));
- }
- return new SideInputContainerSideInputReader(newContainedViews);
- }
-
- /**
- * Write the provided values to the provided view.
- *
- * <p>The windowed values are first exploded, then for each window the pane is determined. For
- * each window, if the pane is later than the current pane stored within this container, write
- * all of the values to the container as the new values of the {@link PCollectionView}.
- *
- * <p>The provided iterable is expected to contain only a single window and pane.
- */
- public void write(PCollectionView<?> view, Iterable<? extends WindowedValue<?>> values) {
- Map<BoundedWindow, Collection<WindowedValue<?>>> valuesPerWindow =
- indexValuesByWindow(values);
- for (Map.Entry<BoundedWindow, Collection<WindowedValue<?>>> windowValues :
- valuesPerWindow.entrySet()) {
- updatePCollectionViewWindowValues(view, windowValues.getKey(), windowValues.getValue());
- }
- }
-
- /**
- * Index the provided values by all {@link BoundedWindow windows} in which they appear.
- */
- private Map<BoundedWindow, Collection<WindowedValue<?>>> indexValuesByWindow(
- Iterable<? extends WindowedValue<?>> values) {
- Map<BoundedWindow, Collection<WindowedValue<?>>> valuesPerWindow = new HashMap<>();
- for (WindowedValue<?> value : values) {
- for (BoundedWindow window : value.getWindows()) {
- Collection<WindowedValue<?>> windowValues = valuesPerWindow.get(window);
- if (windowValues == null) {
- windowValues = new ArrayList<>();
- valuesPerWindow.put(window, windowValues);
- }
- windowValues.add(value);
- }
- }
- return valuesPerWindow;
- }
-
- /**
- * Set the value of the {@link PCollectionView} in the {@link BoundedWindow} to be based on the
- * specified values, if the values are part of a later pane than currently exist within the
- * {@link PCollectionViewWindow}.
- */
- private void updatePCollectionViewWindowValues(
- PCollectionView<?> view, BoundedWindow window, Collection<WindowedValue<?>> windowValues) {
- PCollectionViewWindow<?> windowedView = PCollectionViewWindow.of(view, window);
- SettableFuture<Iterable<? extends WindowedValue<?>>> future = null;
- try {
- future = viewByWindows.get(windowedView);
- if (future.isDone()) {
- Iterator<? extends WindowedValue<?>> existingValues = future.get().iterator();
- PaneInfo newPane = windowValues.iterator().next().getPane();
- // The current value may have no elements, if no elements were produced for the window,
- // but we are recieving late data.
- if (!existingValues.hasNext()
- || newPane.getIndex() > existingValues.next().getPane().getIndex()) {
- viewByWindows.invalidate(windowedView);
- viewByWindows.get(windowedView).set(windowValues);
- }
- } else {
- future.set(windowValues);
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- if (future != null && !future.isDone()) {
- future.set(Collections.<WindowedValue<?>>emptyList());
- }
- } catch (ExecutionException e) {
- Throwables.propagate(e.getCause());
- }
- }
-
- private final class SideInputContainerSideInputReader implements ReadyCheckingSideInputReader {
- private final Collection<PCollectionView<?>> readerViews;
-
- private SideInputContainerSideInputReader(Collection<PCollectionView<?>> readerViews) {
- this.readerViews = ImmutableSet.copyOf(readerViews);
- }
-
- @Override
- public boolean isReady(final PCollectionView<?> view, final BoundedWindow window) {
- checkArgument(
- readerViews.contains(view),
- "Tried to check if view %s was ready in a SideInputReader that does not contain it. "
- + "Contained views; %s",
- view,
- readerViews);
- return getViewFuture(view, window).isDone();
- }
-
- @Override
- @Nullable
- public <T> T get(final PCollectionView<T> view, final BoundedWindow window) {
- checkArgument(
- readerViews.contains(view), "calling get(PCollectionView) with unknown view: " + view);
- try {
- final Future<Iterable<? extends WindowedValue<?>>> future = getViewFuture(view, window);
- // Safe covariant cast
- @SuppressWarnings("unchecked")
- Iterable<WindowedValue<?>> values = (Iterable<WindowedValue<?>>) future.get();
- return view.fromIterableInternal(values);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return null;
- } catch (ExecutionException e) {
- throw new RuntimeException(e);
- }
- }
-
- /**
- * Gets the future containing the contents of the provided {@link PCollectionView} in the
- * provided {@link BoundedWindow}, setting up a callback to populate the future with empty
- * contents if necessary.
- */
- private <T> Future<Iterable<? extends WindowedValue<?>>> getViewFuture(
- final PCollectionView<T> view, final BoundedWindow window) {
- PCollectionViewWindow<T> windowedView = PCollectionViewWindow.of(view, window);
- final SettableFuture<Iterable<? extends WindowedValue<?>>> future =
- viewByWindows.getUnchecked(windowedView);
-
- WindowingStrategy<?, ?> windowingStrategy = view.getWindowingStrategyInternal();
- evaluationContext.scheduleAfterOutputWouldBeProduced(
- view, window, windowingStrategy, new WriteEmptyViewContents(view, window, future));
- return future;
- }
-
- @Override
- public <T> boolean contains(PCollectionView<T> view) {
- return readerViews.contains(view);
- }
-
- @Override
- public boolean isEmpty() {
- return readerViews.isEmpty();
- }
- }
-
- private static class WriteEmptyViewContents implements Runnable {
- private final PCollectionView<?> view;
- private final BoundedWindow window;
- private final SettableFuture<Iterable<? extends WindowedValue<?>>> future;
-
- private WriteEmptyViewContents(PCollectionView<?> view, BoundedWindow window,
- SettableFuture<Iterable<? extends WindowedValue<?>>> future) {
- this.future = future;
- this.view = view;
- this.window = window;
- }
-
- @Override
- public void run() {
- // The requested window has closed without producing elements, so reflect that in
- // the PCollectionView. If set has already been called, will do nothing.
- future.set(Collections.<WindowedValue<?>>emptyList());
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("view", view)
- .add("window", window)
- .toString();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessTimerInternals.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessTimerInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessTimerInternals.java
deleted file mode 100644
index 3422efd..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessTimerInternals.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import org.apache.beam.sdk.runners.inprocess.InMemoryWatermarkManager.TimerUpdate;
-import org.apache.beam.sdk.runners.inprocess.InMemoryWatermarkManager.TimerUpdate.TimerUpdateBuilder;
-import org.apache.beam.sdk.runners.inprocess.InMemoryWatermarkManager.TransformWatermarks;
-import org.apache.beam.sdk.util.TimerInternals;
-
-import org.joda.time.Instant;
-
-import javax.annotation.Nullable;
-
-/**
- * An implementation of {@link TimerInternals} where all relevant data exists in memory.
- */
-public class InProcessTimerInternals implements TimerInternals {
- private final Clock processingTimeClock;
- private final TransformWatermarks watermarks;
- private final TimerUpdateBuilder timerUpdateBuilder;
-
- public static InProcessTimerInternals create(
- Clock clock, TransformWatermarks watermarks, TimerUpdateBuilder timerUpdateBuilder) {
- return new InProcessTimerInternals(clock, watermarks, timerUpdateBuilder);
- }
-
- private InProcessTimerInternals(
- Clock clock, TransformWatermarks watermarks, TimerUpdateBuilder timerUpdateBuilder) {
- this.processingTimeClock = clock;
- this.watermarks = watermarks;
- this.timerUpdateBuilder = timerUpdateBuilder;
- }
-
- @Override
- public void setTimer(TimerData timerKey) {
- timerUpdateBuilder.setTimer(timerKey);
- }
-
- @Override
- public void deleteTimer(TimerData timerKey) {
- timerUpdateBuilder.deletedTimer(timerKey);
- }
-
- public TimerUpdate getTimerUpdate() {
- return timerUpdateBuilder.build();
- }
-
- @Override
- public Instant currentProcessingTime() {
- return processingTimeClock.now();
- }
-
- @Override
- @Nullable
- public Instant currentSynchronizedProcessingTime() {
- return watermarks.getSynchronizedProcessingInputTime();
- }
-
- @Override
- public Instant currentInputWatermarkTime() {
- return watermarks.getInputWatermark();
- }
-
- @Override
- @Nullable
- public Instant currentOutputWatermarkTime() {
- return watermarks.getOutputWatermark();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessTransformResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessTransformResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessTransformResult.java
deleted file mode 100644
index ed77f70..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessTransformResult.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import org.apache.beam.sdk.runners.inprocess.InMemoryWatermarkManager.TimerUpdate;
-import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.common.CounterSet;
-import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
-
-import org.joda.time.Instant;
-
-import javax.annotation.Nullable;
-
-/**
- * The result of evaluating an {@link AppliedPTransform} with a {@link TransformEvaluator}.
- */
-public interface InProcessTransformResult {
- /**
- * Returns the {@link AppliedPTransform} that produced this result.
- */
- AppliedPTransform<?, ?, ?> getTransform();
-
- /**
- * Returns the {@link UncommittedBundle (uncommitted) Bundles} output by this transform. These
- * will be committed by the evaluation context as part of completing this result.
- */
- Iterable<? extends UncommittedBundle<?>> getOutputBundles();
-
- /**
- * Returns the {@link CounterSet} used by this {@link PTransform}, or null if this transform did
- * not use a {@link CounterSet}.
- */
- @Nullable CounterSet getCounters();
-
- /**
- * Returns the Watermark Hold for the transform at the time this result was produced.
- *
- * If the transform does not set any watermark hold, returns
- * {@link BoundedWindow#TIMESTAMP_MAX_VALUE}.
- */
- Instant getWatermarkHold();
-
- /**
- * Returns the State used by the transform.
- *
- * If this evaluation did not access state, this may return null.
- */
- @Nullable
- CopyOnAccessInMemoryStateInternals<?> getState();
-
- /**
- * Returns a TimerUpdateBuilder that was produced as a result of this evaluation. If the
- * evaluation was triggered due to the delivery of one or more timers, those timers must be added
- * to the builder before it is complete.
- *
- * <p>If this evaluation did not add or remove any timers, returns an empty TimerUpdate.
- */
- TimerUpdate getTimerUpdate();
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/KeyedPValueTrackingVisitor.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/KeyedPValueTrackingVisitor.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/KeyedPValueTrackingVisitor.java
deleted file mode 100644
index 0e6b7e8..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/KeyedPValueTrackingVisitor.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import org.apache.beam.sdk.Pipeline.PipelineVisitor;
-import org.apache.beam.sdk.runners.TransformTreeNode;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PValue;
-
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * A pipeline visitor that tracks all keyed {@link PValue PValues}. A {@link PValue} is keyed if it
- * is the result of a {@link PTransform} that produces keyed outputs. A {@link PTransform} that
- * produces keyed outputs is assumed to colocate output elements that share a key.
- *
- * <p>All {@link GroupByKey} transforms, or their runner-specific implementation primitive, produce
- * keyed output.
- */
-// TODO: Handle Key-preserving transforms when appropriate and more aggressively make PTransforms
-// unkeyed
-class KeyedPValueTrackingVisitor implements PipelineVisitor {
- @SuppressWarnings("rawtypes")
- private final Set<Class<? extends PTransform>> producesKeyedOutputs;
- private final Set<PValue> keyedValues;
- private boolean finalized;
-
- public static KeyedPValueTrackingVisitor create(
- @SuppressWarnings("rawtypes") Set<Class<? extends PTransform>> producesKeyedOutputs) {
- return new KeyedPValueTrackingVisitor(producesKeyedOutputs);
- }
-
- private KeyedPValueTrackingVisitor(
- @SuppressWarnings("rawtypes") Set<Class<? extends PTransform>> producesKeyedOutputs) {
- this.producesKeyedOutputs = producesKeyedOutputs;
- this.keyedValues = new HashSet<>();
- }
-
- @Override
- public void enterCompositeTransform(TransformTreeNode node) {
- checkState(
- !finalized,
- "Attempted to use a %s that has already been finalized on a pipeline (visiting node %s)",
- KeyedPValueTrackingVisitor.class.getSimpleName(),
- node);
- }
-
- @Override
- public void leaveCompositeTransform(TransformTreeNode node) {
- checkState(
- !finalized,
- "Attempted to use a %s that has already been finalized on a pipeline (visiting node %s)",
- KeyedPValueTrackingVisitor.class.getSimpleName(),
- node);
- if (node.isRootNode()) {
- finalized = true;
- } else if (producesKeyedOutputs.contains(node.getTransform().getClass())) {
- keyedValues.addAll(node.getExpandedOutputs());
- }
- }
-
- @Override
- public void visitTransform(TransformTreeNode node) {}
-
- @Override
- public void visitValue(PValue value, TransformTreeNode producer) {
- if (producesKeyedOutputs.contains(producer.getTransform().getClass())) {
- keyedValues.addAll(value.expand());
- }
- }
-
- public Set<PValue> getKeyedPValues() {
- checkState(
- finalized, "can't call getKeyedPValues before a Pipeline has been completely traversed");
- return keyedValues;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ModelEnforcement.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ModelEnforcement.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ModelEnforcement.java
deleted file mode 100644
index 4a3d17a..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ModelEnforcement.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollection;
-
-/**
- * Enforcement tools that verify that executing code conforms to the model.
- *
- * <p>ModelEnforcement is performed on a per-element and per-bundle basis. The
- * {@link ModelEnforcement} is provided with the input bundle as part of
- * {@link ModelEnforcementFactory#forBundle(CommittedBundle, AppliedPTransform)}, each element
- * before and after that element is provided to an underlying {@link TransformEvaluator}, and the
- * output {@link InProcessTransformResult} and committed output bundles after the
- * {@link TransformEvaluator} has completed.
- *
- * <p>Typically, {@link ModelEnforcement} will obtain required metadata (such as the {@link Coder}
- * of the input {@link PCollection} on construction, and then enforce per-element behavior
- * (such as the immutability of input elements). When the element is output or the bundle is
- * completed, the required conditions can be enforced across all elements.
- */
-public interface ModelEnforcement<T> {
- /**
- * Called before a call to {@link TransformEvaluator#processElement(WindowedValue)} on the
- * provided {@link WindowedValue}.
- */
- void beforeElement(WindowedValue<T> element);
-
- /**
- * Called after a call to {@link TransformEvaluator#processElement(WindowedValue)} on the
- * provided {@link WindowedValue}.
- */
- void afterElement(WindowedValue<T> element);
-
- /**
- * Called after a bundle has been completed and {@link TransformEvaluator#finishBundle()} has been
- * called, producing the provided {@link InProcessTransformResult} and
- * {@link CommittedBundle output bundles}.
- */
- void afterFinish(
- CommittedBundle<T> input,
- InProcessTransformResult result,
- Iterable<? extends CommittedBundle<?>> outputs);
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ModelEnforcementFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ModelEnforcementFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ModelEnforcementFactory.java
deleted file mode 100644
index 1fa36d6..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ModelEnforcementFactory.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-
-/**
- * Creates {@link ModelEnforcement} instances for an {@link AppliedPTransform} on an input
- * {@link CommittedBundle bundle}. {@link ModelEnforcement} instances are created before the
- * {@link TransformEvaluator} is created.
- */
-public interface ModelEnforcementFactory {
- <T> ModelEnforcement<T> forBundle(CommittedBundle<T> input, AppliedPTransform<?, ?, ?> consumer);
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/NanosOffsetClock.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/NanosOffsetClock.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/NanosOffsetClock.java
deleted file mode 100644
index 71039fa..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/NanosOffsetClock.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import org.apache.beam.sdk.options.DefaultValueFactory;
-import org.apache.beam.sdk.options.PipelineOptions;
-
-import org.joda.time.Instant;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * A {@link Clock} that uses {@link System#nanoTime()} to track the progress of time.
- */
-public class NanosOffsetClock implements Clock {
- private final long baseMillis;
- private final long nanosAtBaseMillis;
-
- public static NanosOffsetClock create() {
- return new NanosOffsetClock();
- }
-
- private NanosOffsetClock() {
- baseMillis = System.currentTimeMillis();
- nanosAtBaseMillis = System.nanoTime();
- }
-
- @Override
- public Instant now() {
- return new Instant(
- baseMillis + (TimeUnit.MILLISECONDS.convert(
- System.nanoTime() - nanosAtBaseMillis, TimeUnit.NANOSECONDS)));
- }
-
- /**
- * Creates instances of {@link NanosOffsetClock}.
- */
- public static class Factory implements DefaultValueFactory<Clock> {
- @Override
- public Clock create(PipelineOptions options) {
- return new NanosOffsetClock();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/PTransformOverrideFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/PTransformOverrideFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/PTransformOverrideFactory.java
deleted file mode 100644
index 2b4bf09..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/PTransformOverrideFactory.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-
-interface PTransformOverrideFactory {
- /**
- * Create a {@link PTransform} override for the provided {@link PTransform} if applicable.
- * Otherwise, return the input {@link PTransform}.
- *
- * <p>The returned PTransform must be semantically equivalent to the input {@link PTransform}.
- */
- <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override(
- PTransform<InputT, OutputT> transform);
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoInProcessEvaluator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoInProcessEvaluator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoInProcessEvaluator.java
deleted file mode 100644
index 35639bd..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoInProcessEvaluator.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import org.apache.beam.sdk.runners.inprocess.InProcessExecutionContext.InProcessStepContext;
-import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.util.DoFnRunner;
-import org.apache.beam.sdk.util.DoFnRunners;
-import org.apache.beam.sdk.util.DoFnRunners.OutputManager;
-import org.apache.beam.sdk.util.SerializableUtils;
-import org.apache.beam.sdk.util.UserCodeException;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.common.CounterSet;
-import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-class ParDoInProcessEvaluator<T> implements TransformEvaluator<T> {
- public static <InputT, OutputT> ParDoInProcessEvaluator<InputT> create(
- InProcessEvaluationContext evaluationContext,
- CommittedBundle<InputT> inputBundle,
- AppliedPTransform<PCollection<InputT>, ?, ?> application,
- DoFn<InputT, OutputT> fn,
- List<PCollectionView<?>> sideInputs,
- TupleTag<OutputT> mainOutputTag,
- List<TupleTag<?>> sideOutputTags,
- Map<TupleTag<?>, PCollection<?>> outputs) {
- InProcessExecutionContext executionContext =
- evaluationContext.getExecutionContext(application, inputBundle.getKey());
- String stepName = evaluationContext.getStepName(application);
- InProcessStepContext stepContext =
- executionContext.getOrCreateStepContext(stepName, stepName);
-
- CounterSet counters = evaluationContext.createCounterSet();
-
- Map<TupleTag<?>, UncommittedBundle<?>> outputBundles = new HashMap<>();
- for (Map.Entry<TupleTag<?>, PCollection<?>> outputEntry : outputs.entrySet()) {
- outputBundles.put(
- outputEntry.getKey(),
- evaluationContext.createBundle(inputBundle, outputEntry.getValue()));
- }
-
- DoFnRunner<InputT, OutputT> runner =
- DoFnRunners.createDefault(
- evaluationContext.getPipelineOptions(),
- SerializableUtils.clone(fn),
- evaluationContext.createSideInputReader(sideInputs),
- BundleOutputManager.create(outputBundles),
- mainOutputTag,
- sideOutputTags,
- stepContext,
- counters.getAddCounterMutator(),
- application.getInput().getWindowingStrategy());
-
- try {
- runner.startBundle();
- } catch (Exception e) {
- throw UserCodeException.wrap(e);
- }
-
- return new ParDoInProcessEvaluator<>(
- runner, application, counters, outputBundles.values(), stepContext);
- }
-
- ////////////////////////////////////////////////////////////////////////////////////////////////
-
- private final DoFnRunner<T, ?> fnRunner;
- private final AppliedPTransform<PCollection<T>, ?, ?> transform;
- private final CounterSet counters;
- private final Collection<UncommittedBundle<?>> outputBundles;
- private final InProcessStepContext stepContext;
-
- private ParDoInProcessEvaluator(
- DoFnRunner<T, ?> fnRunner,
- AppliedPTransform<PCollection<T>, ?, ?> transform,
- CounterSet counters,
- Collection<UncommittedBundle<?>> outputBundles,
- InProcessStepContext stepContext) {
- this.fnRunner = fnRunner;
- this.transform = transform;
- this.counters = counters;
- this.outputBundles = outputBundles;
- this.stepContext = stepContext;
- }
-
- @Override
- public void processElement(WindowedValue<T> element) {
- try {
- fnRunner.processElement(element);
- } catch (Exception e) {
- throw UserCodeException.wrap(e);
- }
- }
-
- @Override
- public InProcessTransformResult finishBundle() {
- try {
- fnRunner.finishBundle();
- } catch (Exception e) {
- throw UserCodeException.wrap(e);
- }
- StepTransformResult.Builder resultBuilder;
- CopyOnAccessInMemoryStateInternals<?> state = stepContext.commitState();
- if (state != null) {
- resultBuilder =
- StepTransformResult.withHold(transform, state.getEarliestWatermarkHold())
- .withState(state);
- } else {
- resultBuilder = StepTransformResult.withoutHold(transform);
- }
- return resultBuilder
- .addOutput(outputBundles)
- .withTimerUpdate(stepContext.getTimerUpdate())
- .withCounters(counters)
- .build();
- }
-
- static class BundleOutputManager implements OutputManager {
- private final Map<TupleTag<?>, UncommittedBundle<?>> bundles;
- private final Map<TupleTag<?>, List<?>> undeclaredOutputs;
-
- public static BundleOutputManager create(Map<TupleTag<?>, UncommittedBundle<?>> outputBundles) {
- return new BundleOutputManager(outputBundles);
- }
-
- private BundleOutputManager(Map<TupleTag<?>, UncommittedBundle<?>> bundles) {
- this.bundles = bundles;
- undeclaredOutputs = new HashMap<>();
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
- @SuppressWarnings("rawtypes")
- UncommittedBundle bundle = bundles.get(tag);
- if (bundle == null) {
- List undeclaredContents = undeclaredOutputs.get(tag);
- if (undeclaredContents == null) {
- undeclaredContents = new ArrayList<T>();
- undeclaredOutputs.put(tag, undeclaredContents);
- }
- undeclaredContents.add(output);
- } else {
- bundle.add(output);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java
deleted file mode 100644
index 299d3a8..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo.BoundMulti;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.TupleTag;
-
-import java.util.Map;
-
-/**
- * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the
- * {@link BoundMulti} primitive {@link PTransform}.
- */
-class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory {
- @Override
- public <T> TransformEvaluator<T> forApplication(
- AppliedPTransform<?, ?, ?> application,
- CommittedBundle<?> inputBundle,
- InProcessEvaluationContext evaluationContext) {
- @SuppressWarnings({"unchecked", "rawtypes"})
- TransformEvaluator<T> evaluator =
- createMultiEvaluator((AppliedPTransform) application, inputBundle, evaluationContext);
- return evaluator;
- }
-
- private static <InT, OuT> ParDoInProcessEvaluator<InT> createMultiEvaluator(
- AppliedPTransform<PCollection<InT>, PCollectionTuple, BoundMulti<InT, OuT>> application,
- CommittedBundle<InT> inputBundle,
- InProcessEvaluationContext evaluationContext) {
- Map<TupleTag<?>, PCollection<?>> outputs = application.getOutput().getAll();
- DoFn<InT, OuT> fn = application.getTransform().getFn();
-
- return ParDoInProcessEvaluator.create(
- evaluationContext,
- inputBundle,
- application,
- fn,
- application.getTransform().getSideInputs(),
- application.getTransform().getMainOutputTag(),
- application.getTransform().getSideOutputTags().getAll(),
- outputs);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java
deleted file mode 100644
index 4d38448..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo.Bound;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TupleTag;
-
-import com.google.common.collect.ImmutableMap;
-
-import java.util.Collections;
-
-/**
- * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the
- * {@link Bound ParDo.Bound} primitive {@link PTransform}.
- */
-class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory {
- @Override
- public <T> TransformEvaluator<T> forApplication(
- final AppliedPTransform<?, ?, ?> application,
- CommittedBundle<?> inputBundle,
- InProcessEvaluationContext evaluationContext) {
- @SuppressWarnings({"unchecked", "rawtypes"})
- TransformEvaluator<T> evaluator =
- createSingleEvaluator((AppliedPTransform) application, inputBundle, evaluationContext);
- return evaluator;
- }
-
- private static <InputT, OutputT> ParDoInProcessEvaluator<InputT> createSingleEvaluator(
- @SuppressWarnings("rawtypes") AppliedPTransform<PCollection<InputT>, PCollection<OutputT>,
- Bound<InputT, OutputT>> application,
- CommittedBundle<InputT> inputBundle, InProcessEvaluationContext evaluationContext) {
- TupleTag<OutputT> mainOutputTag = new TupleTag<>("out");
-
- return ParDoInProcessEvaluator.create(
- evaluationContext,
- inputBundle,
- application,
- application.getTransform().getFn(),
- application.getTransform().getSideInputs(),
- mainOutputTag,
- Collections.<TupleTag<?>>emptyList(),
- ImmutableMap.<TupleTag<?>, PCollection<?>>of(mainOutputTag, application.getOutput()));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/PassthroughTransformEvaluator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/PassthroughTransformEvaluator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/PassthroughTransformEvaluator.java
deleted file mode 100644
index a90cd7b..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/PassthroughTransformEvaluator.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.util.WindowedValue;
-
-class PassthroughTransformEvaluator<InputT> implements TransformEvaluator<InputT> {
- public static <InputT> PassthroughTransformEvaluator<InputT> create(
- AppliedPTransform<?, ?, ?> transform, UncommittedBundle<InputT> output) {
- return new PassthroughTransformEvaluator<>(transform, output);
- }
-
- private final AppliedPTransform<?, ?, ?> transform;
- private final UncommittedBundle<InputT> output;
-
- private PassthroughTransformEvaluator(
- AppliedPTransform<?, ?, ?> transform, UncommittedBundle<InputT> output) {
- this.transform = transform;
- this.output = output;
- }
-
- @Override
- public void processElement(WindowedValue<InputT> element) throws Exception {
- output.add(element);
- }
-
- @Override
- public InProcessTransformResult finishBundle() throws Exception {
- return StepTransformResult.withoutHold(transform).addOutput(output).build();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ShardControlledWrite.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ShardControlledWrite.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ShardControlledWrite.java
deleted file mode 100644
index 88630ad..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ShardControlledWrite.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.Partition;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionList;
-import org.apache.beam.sdk.values.PDone;
-
-import java.util.concurrent.ThreadLocalRandom;
-
-/**
- * A write that explicitly controls its number of output shards.
- */
-abstract class ShardControlledWrite<InputT>
- extends ForwardingPTransform<PCollection<InputT>, PDone> {
- @Override
- public PDone apply(PCollection<InputT> input) {
- int numShards = getNumShards();
- checkArgument(
- numShards >= 1,
- "%s should only be applied if the output has a controlled number of shards (> 1); got %s",
- getClass().getSimpleName(),
- getNumShards());
- PCollectionList<InputT> shards =
- input.apply(
- "PartitionInto" + numShards + "Shards",
- Partition.of(getNumShards(), new RandomSeedPartitionFn<InputT>()));
- for (int i = 0; i < shards.size(); i++) {
- PCollection<InputT> shard = shards.get(i);
- PTransform<? super PCollection<InputT>, PDone> writeShard = getSingleShardTransform(i);
- shard.apply(String.format("%s(Shard:%s)", writeShard.getName(), i), writeShard);
- }
- return PDone.in(input.getPipeline());
- }
-
- /**
- * Returns the number of shards this {@link PTransform} should write to.
- */
- abstract int getNumShards();
-
- /**
- * Returns a {@link PTransform} that performs a write to the shard with the specified shard
- * number.
- *
- * <p>This method will be called n times, where n is the value of {@link #getNumShards()}, for
- * shard numbers {@code [0...n)}.
- */
- abstract PTransform<? super PCollection<InputT>, PDone> getSingleShardTransform(int shardNum);
-
- private static class RandomSeedPartitionFn<T> implements Partition.PartitionFn<T> {
- int nextPartition = -1;
- @Override
- public int partitionFor(T elem, int numPartitions) {
- if (nextPartition < 0) {
- nextPartition = ThreadLocalRandom.current().nextInt(numPartitions);
- }
- nextPartition++;
- nextPartition %= numPartitions;
- return nextPartition;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/StepAndKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/StepAndKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/StepAndKey.java
deleted file mode 100644
index 9c4d9aa..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/StepAndKey.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-
-import com.google.common.base.MoreObjects;
-
-import java.util.Objects;
-
-/**
- * A (Step, Key) pair. This is useful as a map key or cache key for things that are available
- * per-step in a keyed manner (e.g. State).
- */
-final class StepAndKey {
- private final AppliedPTransform<?, ?, ?> step;
- private final Object key;
-
- /**
- * Create a new {@link StepAndKey} with the provided step and key.
- */
- public static StepAndKey of(AppliedPTransform<?, ?, ?> step, Object key) {
- return new StepAndKey(step, key);
- }
-
- private StepAndKey(AppliedPTransform<?, ?, ?> step, Object key) {
- this.step = step;
- this.key = key;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(StepAndKey.class)
- .add("step", step.getFullName())
- .add("key", key)
- .toString();
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(step, key);
- }
-
- @Override
- public boolean equals(Object other) {
- if (other == this) {
- return true;
- } else if (!(other instanceof StepAndKey)) {
- return false;
- } else {
- StepAndKey that = (StepAndKey) other;
- return Objects.equals(this.step, that.step)
- && Objects.equals(this.key, that.key);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/StepTransformResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/StepTransformResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/StepTransformResult.java
deleted file mode 100644
index 8874eda..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/StepTransformResult.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import org.apache.beam.sdk.runners.inprocess.InMemoryWatermarkManager.TimerUpdate;
-import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.common.CounterSet;
-import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
-
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.ImmutableList;
-
-import org.joda.time.Instant;
-
-import java.util.Collection;
-
-import javax.annotation.Nullable;
-
-/**
- * An immutable {@link InProcessTransformResult}.
- */
-public class StepTransformResult implements InProcessTransformResult {
- private final AppliedPTransform<?, ?, ?> transform;
- private final Iterable<? extends UncommittedBundle<?>> bundles;
- @Nullable private final CopyOnAccessInMemoryStateInternals<?> state;
- private final TimerUpdate timerUpdate;
- @Nullable private final CounterSet counters;
- private final Instant watermarkHold;
-
- private StepTransformResult(
- AppliedPTransform<?, ?, ?> transform,
- Iterable<? extends UncommittedBundle<?>> outputBundles,
- CopyOnAccessInMemoryStateInternals<?> state,
- TimerUpdate timerUpdate,
- CounterSet counters,
- Instant watermarkHold) {
- this.transform = checkNotNull(transform);
- this.bundles = checkNotNull(outputBundles);
- this.state = state;
- this.timerUpdate = checkNotNull(timerUpdate);
- this.counters = counters;
- this.watermarkHold = checkNotNull(watermarkHold);
- }
-
- @Override
- public Iterable<? extends UncommittedBundle<?>> getOutputBundles() {
- return bundles;
- }
-
- @Override
- public CounterSet getCounters() {
- return counters;
- }
-
- @Override
- public AppliedPTransform<?, ?, ?> getTransform() {
- return transform;
- }
-
- @Override
- public Instant getWatermarkHold() {
- return watermarkHold;
- }
-
- @Nullable
- @Override
- public CopyOnAccessInMemoryStateInternals<?> getState() {
- return state;
- }
-
- @Override
- public TimerUpdate getTimerUpdate() {
- return timerUpdate;
- }
-
- public static Builder withHold(AppliedPTransform<?, ?, ?> transform, Instant watermarkHold) {
- return new Builder(transform, watermarkHold);
- }
-
- public static Builder withoutHold(AppliedPTransform<?, ?, ?> transform) {
- return new Builder(transform, BoundedWindow.TIMESTAMP_MAX_VALUE);
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(StepTransformResult.class)
- .add("transform", transform)
- .toString();
- }
-
- /**
- * A builder for creating instances of {@link StepTransformResult}.
- */
- public static class Builder {
- private final AppliedPTransform<?, ?, ?> transform;
- private final ImmutableList.Builder<UncommittedBundle<?>> bundlesBuilder;
- private CopyOnAccessInMemoryStateInternals<?> state;
- private TimerUpdate timerUpdate;
- private CounterSet counters;
- private final Instant watermarkHold;
-
- private Builder(AppliedPTransform<?, ?, ?> transform, Instant watermarkHold) {
- this.transform = transform;
- this.watermarkHold = watermarkHold;
- this.bundlesBuilder = ImmutableList.builder();
- this.timerUpdate = TimerUpdate.builder(null).build();
- }
-
- public StepTransformResult build() {
- return new StepTransformResult(
- transform,
- bundlesBuilder.build(),
- state,
- timerUpdate,
- counters,
- watermarkHold);
- }
-
- public Builder withCounters(CounterSet counters) {
- this.counters = counters;
- return this;
- }
-
- public Builder withState(CopyOnAccessInMemoryStateInternals<?> state) {
- this.state = state;
- return this;
- }
-
- public Builder withTimerUpdate(TimerUpdate timerUpdate) {
- this.timerUpdate = timerUpdate;
- return this;
- }
-
- public Builder addOutput(
- UncommittedBundle<?> outputBundle, UncommittedBundle<?>... outputBundles) {
- bundlesBuilder.add(outputBundle);
- bundlesBuilder.add(outputBundles);
- return this;
- }
-
- public Builder addOutput(Collection<UncommittedBundle<?>> outputBundles) {
- bundlesBuilder.addAll(outputBundles);
- return this;
- }
- }
-}