You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/04/29 23:56:03 UTC
[05/17] incubator-beam git commit: Move InProcessRunner to its own
module
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TextIOShardedWriteFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TextIOShardedWriteFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TextIOShardedWriteFactory.java
deleted file mode 100644
index fac5a40..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TextIOShardedWriteFactory.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.io.TextIO.Write.Bound;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.IOChannelUtils;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-
-class TextIOShardedWriteFactory implements PTransformOverrideFactory {
-
- @Override
- public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override(
- PTransform<InputT, OutputT> transform) {
- if (transform instanceof TextIO.Write.Bound) {
- @SuppressWarnings("unchecked")
- TextIO.Write.Bound<InputT> originalWrite = (TextIO.Write.Bound<InputT>) transform;
- if (originalWrite.getNumShards() > 1
- || (originalWrite.getNumShards() == 1
- && !"".equals(originalWrite.getShardNameTemplate()))) {
- @SuppressWarnings("unchecked")
- PTransform<InputT, OutputT> override =
- (PTransform<InputT, OutputT>) new TextIOShardedWrite<InputT>(originalWrite);
- return override;
- }
- }
- return transform;
- }
-
- private static class TextIOShardedWrite<InputT> extends ShardControlledWrite<InputT> {
- private final TextIO.Write.Bound<InputT> initial;
-
- private TextIOShardedWrite(Bound<InputT> initial) {
- this.initial = initial;
- }
-
- @Override
- int getNumShards() {
- return initial.getNumShards();
- }
-
- @Override
- PTransform<PCollection<InputT>, PDone> getSingleShardTransform(int shardNum) {
- String shardName =
- IOChannelUtils.constructName(
- initial.getFilenamePrefix(),
- initial.getShardTemplate(),
- initial.getFilenameSuffix(),
- shardNum,
- getNumShards());
- return TextIO.Write.withCoder(initial.getCoder()).to(shardName).withoutSharding();
- }
-
- @Override
- protected PTransform<PCollection<InputT>, PDone> delegate() {
- return initial;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformEvaluator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformEvaluator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformEvaluator.java
deleted file mode 100644
index e002329..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformEvaluator.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.sdk.util.WindowedValue;
-
-/**
- * An evaluator of a specific application of a transform. Will be used for at least one
- * {@link CommittedBundle}.
- *
- * @param <InputT> the type of elements that will be passed to {@link #processElement}
- */
-public interface TransformEvaluator<InputT> {
- /**
- * Process an element in the input {@link CommittedBundle}.
- *
- * @param element the element to process
- */
- void processElement(WindowedValue<InputT> element) throws Exception;
-
- /**
- * Finish processing the bundle of this {@link TransformEvaluator}.
- *
- * After {@link #finishBundle()} is called, the {@link TransformEvaluator} will not be reused,
- * and no more elements will be processed.
- *
- * @return an {@link InProcessTransformResult} containing the results of this bundle evaluation.
- */
- InProcessTransformResult finishBundle() throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformEvaluatorFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformEvaluatorFactory.java
deleted file mode 100644
index a9f6759..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformEvaluatorFactory.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-
-import javax.annotation.Nullable;
-
-/**
- * A factory for creating instances of {@link TransformEvaluator} for the application of a
- * {@link PTransform}.
- */
-public interface TransformEvaluatorFactory {
- /**
- * Create a new {@link TransformEvaluator} for the application of the {@link PTransform}.
- *
- * Any work that must be done before input elements are processed (such as calling
- * {@link DoFn#startBundle(DoFn.Context)}) must be done before the {@link TransformEvaluator} is
- * made available to the caller.
- *
- * @throws Exception whenever constructing the underlying evaluator throws an exception
- */
- <InputT> TransformEvaluator<InputT> forApplication(
- AppliedPTransform<?, ?, ?> application, @Nullable CommittedBundle<?> inputBundle,
- InProcessEvaluationContext evaluationContext) throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformEvaluatorRegistry.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformEvaluatorRegistry.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformEvaluatorRegistry.java
deleted file mode 100644
index f6542b8..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformEvaluatorRegistry.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.Window;
-
-import com.google.common.collect.ImmutableMap;
-
-import java.util.Map;
-
-import javax.annotation.Nullable;
-
-/**
- * A {@link TransformEvaluatorFactory} that delegates to primitive {@link TransformEvaluatorFactory}
- * implementations based on the type of {@link PTransform} of the application.
- */
-class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
- public static TransformEvaluatorRegistry defaultRegistry() {
- @SuppressWarnings("rawtypes")
- ImmutableMap<Class<? extends PTransform>, TransformEvaluatorFactory> primitives =
- ImmutableMap.<Class<? extends PTransform>, TransformEvaluatorFactory>builder()
- .put(Read.Bounded.class, new BoundedReadEvaluatorFactory())
- .put(Read.Unbounded.class, new UnboundedReadEvaluatorFactory())
- .put(ParDo.Bound.class, new ParDoSingleEvaluatorFactory())
- .put(ParDo.BoundMulti.class, new ParDoMultiEvaluatorFactory())
- .put(
- GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly.class,
- new GroupByKeyEvaluatorFactory())
- .put(FlattenPCollectionList.class, new FlattenEvaluatorFactory())
- .put(ViewEvaluatorFactory.WriteView.class, new ViewEvaluatorFactory())
- .put(Window.Bound.class, new WindowEvaluatorFactory())
- .build();
- return new TransformEvaluatorRegistry(primitives);
- }
-
- // the TransformEvaluatorFactories can construct instances of all generic types of transform,
- // so all instances of a primitive can be handled with the same evaluator factory.
- @SuppressWarnings("rawtypes")
- private final Map<Class<? extends PTransform>, TransformEvaluatorFactory> factories;
-
- private TransformEvaluatorRegistry(
- @SuppressWarnings("rawtypes")
- Map<Class<? extends PTransform>, TransformEvaluatorFactory> factories) {
- this.factories = factories;
- }
-
- @Override
- public <InputT> TransformEvaluator<InputT> forApplication(
- AppliedPTransform<?, ?, ?> application,
- @Nullable CommittedBundle<?> inputBundle,
- InProcessEvaluationContext evaluationContext)
- throws Exception {
- TransformEvaluatorFactory factory = factories.get(application.getTransform().getClass());
- return factory.forApplication(application, inputBundle, evaluationContext);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutor.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutor.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutor.java
deleted file mode 100644
index a93c7b2..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutor.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.util.WindowedValue;
-
-import com.google.common.base.Throwables;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.concurrent.Callable;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.annotation.Nullable;
-
-/**
- * A {@link Callable} responsible for constructing a {@link TransformEvaluator} from a
- * {@link TransformEvaluatorFactory} and evaluating it on some bundle of input, and registering
- * the result using a registered {@link CompletionCallback}.
- *
- * <p>A {@link TransformExecutor} that is currently executing also provides access to the thread
- * that it is being executed on.
- */
-class TransformExecutor<T> implements Callable<InProcessTransformResult> {
- public static <T> TransformExecutor<T> create(
- TransformEvaluatorFactory factory,
- Iterable<? extends ModelEnforcementFactory> modelEnforcements,
- InProcessEvaluationContext evaluationContext,
- CommittedBundle<T> inputBundle,
- AppliedPTransform<?, ?, ?> transform,
- CompletionCallback completionCallback,
- TransformExecutorService transformEvaluationState) {
- return new TransformExecutor<>(
- factory,
- modelEnforcements,
- evaluationContext,
- inputBundle,
- transform,
- completionCallback,
- transformEvaluationState);
- }
-
- private final TransformEvaluatorFactory evaluatorFactory;
- private final Iterable<? extends ModelEnforcementFactory> modelEnforcements;
-
- private final InProcessEvaluationContext evaluationContext;
-
- /** The transform that will be evaluated. */
- private final AppliedPTransform<?, ?, ?> transform;
- /** The inputs this {@link TransformExecutor} will deliver to the transform. */
- private final CommittedBundle<T> inputBundle;
-
- private final CompletionCallback onComplete;
- private final TransformExecutorService transformEvaluationState;
-
- private final AtomicReference<Thread> thread;
-
- private TransformExecutor(
- TransformEvaluatorFactory factory,
- Iterable<? extends ModelEnforcementFactory> modelEnforcements,
- InProcessEvaluationContext evaluationContext,
- CommittedBundle<T> inputBundle,
- AppliedPTransform<?, ?, ?> transform,
- CompletionCallback completionCallback,
- TransformExecutorService transformEvaluationState) {
- this.evaluatorFactory = factory;
- this.modelEnforcements = modelEnforcements;
- this.evaluationContext = evaluationContext;
-
- this.inputBundle = inputBundle;
- this.transform = transform;
-
- this.onComplete = completionCallback;
-
- this.transformEvaluationState = transformEvaluationState;
- this.thread = new AtomicReference<>();
- }
-
- @Override
- public InProcessTransformResult call() {
- checkState(
- thread.compareAndSet(null, Thread.currentThread()),
- "Tried to execute %s for %s on thread %s, but is already executing on thread %s",
- TransformExecutor.class.getSimpleName(),
- transform.getFullName(),
- Thread.currentThread(),
- thread.get());
- try {
- Collection<ModelEnforcement<T>> enforcements = new ArrayList<>();
- for (ModelEnforcementFactory enforcementFactory : modelEnforcements) {
- ModelEnforcement<T> enforcement = enforcementFactory.forBundle(inputBundle, transform);
- enforcements.add(enforcement);
- }
- TransformEvaluator<T> evaluator =
- evaluatorFactory.forApplication(transform, inputBundle, evaluationContext);
-
- processElements(evaluator, enforcements);
-
- InProcessTransformResult result = finishBundle(evaluator, enforcements);
- return result;
- } catch (Throwable t) {
- onComplete.handleThrowable(inputBundle, t);
- throw Throwables.propagate(t);
- } finally {
- transformEvaluationState.complete(this);
- }
- }
-
- /**
- * Processes all the elements in the input bundle using the transform evaluator, applying any
- * necessary {@link ModelEnforcement ModelEnforcements}.
- */
- private void processElements(
- TransformEvaluator<T> evaluator, Collection<ModelEnforcement<T>> enforcements)
- throws Exception {
- if (inputBundle != null) {
- for (WindowedValue<T> value : inputBundle.getElements()) {
- for (ModelEnforcement<T> enforcement : enforcements) {
- enforcement.beforeElement(value);
- }
-
- evaluator.processElement(value);
-
- for (ModelEnforcement<T> enforcement : enforcements) {
- enforcement.afterElement(value);
- }
- }
- }
- }
-
- /**
- * Finishes processing the input bundle and commit the result using the
- * {@link CompletionCallback}, applying any {@link ModelEnforcement} if necessary.
- *
- * @return the {@link InProcessTransformResult} produced by
- * {@link TransformEvaluator#finishBundle()}
- */
- private InProcessTransformResult finishBundle(
- TransformEvaluator<T> evaluator, Collection<ModelEnforcement<T>> enforcements)
- throws Exception {
- InProcessTransformResult result = evaluator.finishBundle();
- CommittedResult outputs = onComplete.handleResult(inputBundle, result);
- for (ModelEnforcement<T> enforcement : enforcements) {
- enforcement.afterFinish(inputBundle, result, outputs.getOutputs());
- }
- return result;
- }
-
- /**
- * If this {@link TransformExecutor} is currently executing, return the thread it is executing in.
- * Otherwise, return null.
- */
- @Nullable
- public Thread getThread() {
- return thread.get();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorService.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorService.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorService.java
deleted file mode 100644
index 600102f..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorService.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-/**
- * Schedules and completes {@link TransformExecutor TransformExecutors}, controlling concurrency as
- * appropriate for the {@link StepAndKey} the executor exists for.
- */
-interface TransformExecutorService {
- /**
- * Schedule the provided work to be eventually executed.
- */
- void schedule(TransformExecutor<?> work);
-
- /**
- * Finish executing the provided work. This may cause additional
- * {@link TransformExecutor TransformExecutors} to be evaluated.
- */
- void complete(TransformExecutor<?> completed);
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorServices.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorServices.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorServices.java
deleted file mode 100644
index 3194340..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorServices.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import com.google.common.base.MoreObjects;
-
-import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * Static factory methods for constructing instances of {@link TransformExecutorService}.
- */
-final class TransformExecutorServices {
- private TransformExecutorServices() {
- // Do not instantiate
- }
-
- /**
- * Returns an EvaluationState that evaluates {@link TransformExecutor TransformExecutors} in
- * parallel.
- */
- public static TransformExecutorService parallel(
- ExecutorService executor, Map<TransformExecutor<?>, Boolean> scheduled) {
- return new ParallelEvaluationState(executor, scheduled);
- }
-
- /**
- * Returns an EvaluationState that evaluates {@link TransformExecutor TransformExecutors} in
- * serial.
- */
- public static TransformExecutorService serial(
- ExecutorService executor, Map<TransformExecutor<?>, Boolean> scheduled) {
- return new SerialEvaluationState(executor, scheduled);
- }
-
- /**
- * A {@link TransformExecutorService} with unlimited parallelism. Any {@link TransformExecutor}
- * scheduled will be immediately submitted to the {@link ExecutorService}.
- *
- * <p>A principal use of this is for the evaluation of an unkeyed Step. Unkeyed computations are
- * processed in parallel.
- */
- private static class ParallelEvaluationState implements TransformExecutorService {
- private final ExecutorService executor;
- private final Map<TransformExecutor<?>, Boolean> scheduled;
-
- private ParallelEvaluationState(
- ExecutorService executor, Map<TransformExecutor<?>, Boolean> scheduled) {
- this.executor = executor;
- this.scheduled = scheduled;
- }
-
- @Override
- public void schedule(TransformExecutor<?> work) {
- executor.submit(work);
- scheduled.put(work, true);
- }
-
- @Override
- public void complete(TransformExecutor<?> completed) {
- scheduled.remove(completed);
- }
- }
-
- /**
- * A {@link TransformExecutorService} with a single work queue. Any {@link TransformExecutor}
- * scheduled will be placed on the work queue. Only one item of work will be submitted to the
- * {@link ExecutorService} at any time.
- *
- * <p>A principal use of this is for the serial evaluation of a (Step, Key) pair.
- * Keyed computations are processed serially per step.
- */
- private static class SerialEvaluationState implements TransformExecutorService {
- private final ExecutorService executor;
- private final Map<TransformExecutor<?>, Boolean> scheduled;
-
- private AtomicReference<TransformExecutor<?>> currentlyEvaluating;
- private final Queue<TransformExecutor<?>> workQueue;
-
- private SerialEvaluationState(
- ExecutorService executor, Map<TransformExecutor<?>, Boolean> scheduled) {
- this.scheduled = scheduled;
- this.executor = executor;
- this.currentlyEvaluating = new AtomicReference<>();
- this.workQueue = new ConcurrentLinkedQueue<>();
- }
-
- /**
- * Schedules the work, adding it to the work queue if there is a bundle currently being
- * evaluated and scheduling it immediately otherwise.
- */
- @Override
- public void schedule(TransformExecutor<?> work) {
- workQueue.offer(work);
- updateCurrentlyEvaluating();
- }
-
- @Override
- public void complete(TransformExecutor<?> completed) {
- if (!currentlyEvaluating.compareAndSet(completed, null)) {
- throw new IllegalStateException(
- "Finished work "
- + completed
- + " but could not complete due to unexpected currently executing "
- + currentlyEvaluating.get());
- }
- scheduled.remove(completed);
- updateCurrentlyEvaluating();
- }
-
- private void updateCurrentlyEvaluating() {
- if (currentlyEvaluating.get() == null) {
- // Only synchronize if we need to update what's currently evaluating
- synchronized (this) {
- TransformExecutor<?> newWork = workQueue.poll();
- if (newWork != null) {
- if (currentlyEvaluating.compareAndSet(null, newWork)) {
- scheduled.put(newWork, true);
- executor.submit(newWork);
- } else {
- workQueue.offer(newWork);
- }
- }
- }
- }
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(SerialEvaluationState.class)
- .add("currentlyEvaluating", currentlyEvaluating)
- .add("workQueue", workQueue)
- .toString();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java
deleted file mode 100644
index 0cebf43..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import org.apache.beam.sdk.io.Read.Unbounded;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
-import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollection;
-
-import java.io.IOException;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
-
-import javax.annotation.Nullable;
-
-/**
- * A {@link TransformEvaluatorFactory} that produces {@link TransformEvaluator TransformEvaluators}
- * for the {@link Unbounded Read.Unbounded} primitive {@link PTransform}.
- */
-class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
- /*
- * An evaluator for a Source is stateful, to ensure the CheckpointMark is properly persisted.
- * Evaluators are cached here to ensure that the checkpoint mark is appropriately reused
- * and any splits are honored.
- */
- private final ConcurrentMap<EvaluatorKey, Queue<? extends UnboundedReadEvaluator<?>>>
- sourceEvaluators = new ConcurrentHashMap<>();
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- @Override
- public <InputT> TransformEvaluator<InputT> forApplication(AppliedPTransform<?, ?, ?> application,
- @Nullable CommittedBundle<?> inputBundle, InProcessEvaluationContext evaluationContext) {
- return getTransformEvaluator((AppliedPTransform) application, evaluationContext);
- }
-
- private <OutputT> TransformEvaluator<?> getTransformEvaluator(
- final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform,
- final InProcessEvaluationContext evaluationContext) {
- UnboundedReadEvaluator<?> currentEvaluator =
- getTransformEvaluatorQueue(transform, evaluationContext).poll();
- if (currentEvaluator == null) {
- return EmptyTransformEvaluator.create(transform);
- }
- return currentEvaluator;
- }
-
- /**
- * Get the queue of {@link TransformEvaluator TransformEvaluators} that produce elements for the
- * provided application of {@link Unbounded Read.Unbounded}, initializing it if required.
- *
- * <p>This method is thread-safe, and will only produce new evaluators if no other invocation has
- * already done so.
- */
- @SuppressWarnings("unchecked")
- private <OutputT> Queue<UnboundedReadEvaluator<OutputT>> getTransformEvaluatorQueue(
- final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform,
- final InProcessEvaluationContext evaluationContext) {
- // Key by the application and the context the evaluation is occurring in (which call to
- // Pipeline#run).
- EvaluatorKey key = new EvaluatorKey(transform, evaluationContext);
- @SuppressWarnings("unchecked")
- Queue<UnboundedReadEvaluator<OutputT>> evaluatorQueue =
- (Queue<UnboundedReadEvaluator<OutputT>>) sourceEvaluators.get(key);
- if (evaluatorQueue == null) {
- evaluatorQueue = new ConcurrentLinkedQueue<>();
- if (sourceEvaluators.putIfAbsent(key, evaluatorQueue) == null) {
- // If no queue existed in the evaluators, add an evaluator to initialize the evaluator
- // factory for this transform
- UnboundedSource<OutputT, ?> source = transform.getTransform().getSource();
- UnboundedReadEvaluator<OutputT> evaluator =
- new UnboundedReadEvaluator<OutputT>(
- transform, evaluationContext, source, evaluatorQueue);
- evaluatorQueue.offer(evaluator);
- } else {
- // otherwise return the existing Queue that arrived before us
- evaluatorQueue = (Queue<UnboundedReadEvaluator<OutputT>>) sourceEvaluators.get(key);
- }
- }
- return evaluatorQueue;
- }
-
- /**
- * A {@link UnboundedReadEvaluator} produces elements from an underlying {@link UnboundedSource},
- * discarding all input elements. Within the call to {@link #finishBundle()}, the evaluator
- * creates the {@link UnboundedReader} and consumes some currently available input.
- *
- * <p>Calls to {@link UnboundedReadEvaluator} are not internally thread-safe, and should only be
- * used by a single thread at a time. Each {@link UnboundedReadEvaluator} maintains its own
- * checkpoint, and constructs its reader from the current checkpoint in each call to
- * {@link #finishBundle()}.
- */
- private static class UnboundedReadEvaluator<OutputT> implements TransformEvaluator<Object> {
- private static final int ARBITRARY_MAX_ELEMENTS = 10;
- private final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform;
- private final InProcessEvaluationContext evaluationContext;
- private final Queue<UnboundedReadEvaluator<OutputT>> evaluatorQueue;
- /**
- * The source being read from by this {@link UnboundedReadEvaluator}. This may not be the same
- * source as derived from {@link #transform} due to splitting.
- */
- private final UnboundedSource<OutputT, ?> source;
- private CheckpointMark checkpointMark;
-
- public UnboundedReadEvaluator(
- AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform,
- InProcessEvaluationContext evaluationContext,
- UnboundedSource<OutputT, ?> source,
- Queue<UnboundedReadEvaluator<OutputT>> evaluatorQueue) {
- this.transform = transform;
- this.evaluationContext = evaluationContext;
- this.evaluatorQueue = evaluatorQueue;
- this.source = source;
- this.checkpointMark = null;
- }
-
- @Override
- public void processElement(WindowedValue<Object> element) {}
-
- @Override
- public InProcessTransformResult finishBundle() throws IOException {
- UncommittedBundle<OutputT> output = evaluationContext.createRootBundle(transform.getOutput());
- try (UnboundedReader<OutputT> reader =
- createReader(source, evaluationContext.getPipelineOptions());) {
- int numElements = 0;
- if (reader.start()) {
- do {
- output.add(
- WindowedValue.timestampedValueInGlobalWindow(
- reader.getCurrent(), reader.getCurrentTimestamp()));
- numElements++;
- } while (numElements < ARBITRARY_MAX_ELEMENTS && reader.advance());
- }
- checkpointMark = reader.getCheckpointMark();
- checkpointMark.finalizeCheckpoint();
- // TODO: When exercising create initial splits, make this the minimum watermark across all
- // existing readers
- StepTransformResult result =
- StepTransformResult.withHold(transform, reader.getWatermark())
- .addOutput(output)
- .build();
- evaluatorQueue.offer(this);
- return result;
- }
- }
-
- private <CheckpointMarkT extends CheckpointMark> UnboundedReader<OutputT> createReader(
- UnboundedSource<OutputT, CheckpointMarkT> source, PipelineOptions options) {
- @SuppressWarnings("unchecked")
- CheckpointMarkT mark = (CheckpointMarkT) checkpointMark;
- return source.createReader(options, mark);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ViewEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ViewEvaluatorFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ViewEvaluatorFactory.java
deleted file mode 100644
index 0b54ba8..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ViewEvaluatorFactory.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.PCollectionViewWriter;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.Values;
-import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
-import org.apache.beam.sdk.transforms.WithKeys;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the
- * {@link CreatePCollectionView} primitive {@link PTransform}.
- *
- * <p>The {@link ViewEvaluatorFactory} produces {@link TransformEvaluator TransformEvaluators} for
- * the {@link WriteView} {@link PTransform}, which is part of the
- * {@link InProcessCreatePCollectionView} composite transform. This transform is an override for the
- * {@link CreatePCollectionView} transform that applies windowing and triggers before the view is
- * written.
- */
-class ViewEvaluatorFactory implements TransformEvaluatorFactory {
- @Override
- public <T> TransformEvaluator<T> forApplication(
- AppliedPTransform<?, ?, ?> application,
- InProcessPipelineRunner.CommittedBundle<?> inputBundle,
- InProcessEvaluationContext evaluationContext) {
- @SuppressWarnings({"cast", "unchecked", "rawtypes"})
- TransformEvaluator<T> evaluator = createEvaluator(
- (AppliedPTransform) application, evaluationContext);
- return evaluator;
- }
-
- private <InT, OuT> TransformEvaluator<Iterable<InT>> createEvaluator(
- final AppliedPTransform<PCollection<Iterable<InT>>, PCollectionView<OuT>, WriteView<InT, OuT>>
- application,
- InProcessEvaluationContext context) {
- PCollection<Iterable<InT>> input = application.getInput();
- final PCollectionViewWriter<InT, OuT> writer =
- context.createPCollectionViewWriter(input, application.getOutput());
- return new TransformEvaluator<Iterable<InT>>() {
- private final List<WindowedValue<InT>> elements = new ArrayList<>();
-
- @Override
- public void processElement(WindowedValue<Iterable<InT>> element) {
- for (InT input : element.getValue()) {
- elements.add(element.withValue(input));
- }
- }
-
- @Override
- public InProcessTransformResult finishBundle() {
- writer.add(elements);
- return StepTransformResult.withoutHold(application).build();
- }
- };
- }
-
- public static class InProcessViewOverrideFactory implements PTransformOverrideFactory {
- @Override
- public <InputT extends PInput, OutputT extends POutput>
- PTransform<InputT, OutputT> override(PTransform<InputT, OutputT> transform) {
- if (transform instanceof CreatePCollectionView) {
-
- }
- @SuppressWarnings({"rawtypes", "unchecked"})
- PTransform<InputT, OutputT> createView =
- (PTransform<InputT, OutputT>)
- new InProcessCreatePCollectionView<>((CreatePCollectionView) transform);
- return createView;
- }
- }
-
- /**
- * An in-process override for {@link CreatePCollectionView}.
- */
- private static class InProcessCreatePCollectionView<ElemT, ViewT>
- extends ForwardingPTransform<PCollection<ElemT>, PCollectionView<ViewT>> {
- private final CreatePCollectionView<ElemT, ViewT> og;
-
- private InProcessCreatePCollectionView(CreatePCollectionView<ElemT, ViewT> og) {
- this.og = og;
- }
-
- @Override
- public PCollectionView<ViewT> apply(PCollection<ElemT> input) {
- return input.apply(WithKeys.<Void, ElemT>of((Void) null))
- .setCoder(KvCoder.of(VoidCoder.of(), input.getCoder()))
- .apply(GroupByKey.<Void, ElemT>create())
- .apply(Values.<Iterable<ElemT>>create())
- .apply(new WriteView<ElemT, ViewT>(og));
- }
-
- @Override
- protected PTransform<PCollection<ElemT>, PCollectionView<ViewT>> delegate() {
- return og;
- }
- }
-
- /**
- * An in-process implementation of the {@link CreatePCollectionView} primitive.
- *
- * This implementation requires the input {@link PCollection} to be an iterable, which is provided
- * to {@link PCollectionView#fromIterableInternal(Iterable)}.
- */
- public static final class WriteView<ElemT, ViewT>
- extends PTransform<PCollection<Iterable<ElemT>>, PCollectionView<ViewT>> {
- private final CreatePCollectionView<ElemT, ViewT> og;
-
- WriteView(CreatePCollectionView<ElemT, ViewT> og) {
- this.og = og;
- }
-
- @Override
- public PCollectionView<ViewT> apply(PCollection<Iterable<ElemT>> input) {
- return og.getView();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/WatermarkCallbackExecutor.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/WatermarkCallbackExecutor.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/WatermarkCallbackExecutor.java
deleted file mode 100644
index 3e4aca6..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/WatermarkCallbackExecutor.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowingStrategy;
-
-import com.google.common.collect.ComparisonChain;
-import com.google.common.collect.Ordering;
-
-import org.joda.time.Instant;
-
-import java.util.PriorityQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-/**
- * Executes callbacks that occur based on the progression of the watermark per-step.
- *
- * <p>Callbacks are registered by calls to
- * {@link #callOnGuaranteedFiring(AppliedPTransform, BoundedWindow, WindowingStrategy, Runnable)},
- * and are executed after a call to {@link #fireForWatermark(AppliedPTransform, Instant)} with the
- * same {@link AppliedPTransform} and a watermark sufficient to ensure that the trigger for the
- * windowing strategy would have been produced.
- *
- * <p>NOTE: {@link WatermarkCallbackExecutor} does not track the latest observed watermark for any
- * {@link AppliedPTransform} - any call to
- * {@link #callOnGuaranteedFiring(AppliedPTransform, BoundedWindow, WindowingStrategy, Runnable)}
- * that could have potentially already fired should be followed by a call to
- * {@link #fireForWatermark(AppliedPTransform, Instant)} for the same transform with the current
- * value of the watermark.
- */
-class WatermarkCallbackExecutor {
- /**
- * Create a new {@link WatermarkCallbackExecutor}.
- */
- public static WatermarkCallbackExecutor create() {
- return new WatermarkCallbackExecutor();
- }
-
- private final ConcurrentMap<AppliedPTransform<?, ?, ?>, PriorityQueue<WatermarkCallback>>
- callbacks;
- private final ExecutorService executor;
-
- private WatermarkCallbackExecutor() {
- this.callbacks = new ConcurrentHashMap<>();
- this.executor = Executors.newSingleThreadExecutor();
- }
-
- /**
- * Execute the provided {@link Runnable} after the next call to
- * {@link #fireForWatermark(AppliedPTransform, Instant)} where the window is guaranteed to have
- * produced output.
- */
- public void callOnGuaranteedFiring(
- AppliedPTransform<?, ?, ?> step,
- BoundedWindow window,
- WindowingStrategy<?, ?> windowingStrategy,
- Runnable runnable) {
- WatermarkCallback callback =
- WatermarkCallback.onGuaranteedFiring(window, windowingStrategy, runnable);
-
- PriorityQueue<WatermarkCallback> callbackQueue = callbacks.get(step);
- if (callbackQueue == null) {
- callbackQueue = new PriorityQueue<>(11, new CallbackOrdering());
- if (callbacks.putIfAbsent(step, callbackQueue) != null) {
- callbackQueue = callbacks.get(step);
- }
- }
-
- synchronized (callbackQueue) {
- callbackQueue.offer(callback);
- }
- }
-
- /**
- * Schedule all pending callbacks that must have produced output by the time of the provided
- * watermark.
- */
- public void fireForWatermark(AppliedPTransform<?, ?, ?> step, Instant watermark) {
- PriorityQueue<WatermarkCallback> callbackQueue = callbacks.get(step);
- if (callbackQueue == null) {
- return;
- }
- synchronized (callbackQueue) {
- while (!callbackQueue.isEmpty() && callbackQueue.peek().shouldFire(watermark)) {
- executor.submit(callbackQueue.poll().getCallback());
- }
- }
- }
-
- private static class WatermarkCallback {
- public static <W extends BoundedWindow> WatermarkCallback onGuaranteedFiring(
- BoundedWindow window, WindowingStrategy<?, W> strategy, Runnable callback) {
- @SuppressWarnings("unchecked")
- Instant firingAfter =
- strategy.getTrigger().getSpec().getWatermarkThatGuaranteesFiring((W) window);
- return new WatermarkCallback(firingAfter, callback);
- }
-
- private final Instant fireAfter;
- private final Runnable callback;
-
- private WatermarkCallback(Instant fireAfter, Runnable callback) {
- this.fireAfter = fireAfter;
- this.callback = callback;
- }
-
- public boolean shouldFire(Instant currentWatermark) {
- return currentWatermark.isAfter(fireAfter)
- || currentWatermark.equals(BoundedWindow.TIMESTAMP_MAX_VALUE);
- }
-
- public Runnable getCallback() {
- return callback;
- }
- }
-
- private static class CallbackOrdering extends Ordering<WatermarkCallback> {
- @Override
- public int compare(WatermarkCallback left, WatermarkCallback right) {
- return ComparisonChain.start()
- .compare(left.fireAfter, right.fireAfter)
- .compare(left.callback, right.callback, Ordering.arbitrary())
- .result();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/WindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/WindowEvaluatorFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/WindowEvaluatorFactory.java
deleted file mode 100644
index 4cdacec..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/WindowEvaluatorFactory.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.transforms.windowing.Window.Bound;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollection;
-
-import org.joda.time.Instant;
-
-import java.util.Collection;
-
-import javax.annotation.Nullable;
-
-/**
- * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the
- * {@link Bound Window.Bound} primitive {@link PTransform}.
- */
-class WindowEvaluatorFactory implements TransformEvaluatorFactory {
-
- @Override
- public <InputT> TransformEvaluator<InputT> forApplication(
- AppliedPTransform<?, ?, ?> application,
- @Nullable CommittedBundle<?> inputBundle,
- InProcessEvaluationContext evaluationContext)
- throws Exception {
- return createTransformEvaluator(
- (AppliedPTransform) application, inputBundle, evaluationContext);
- }
-
- private <InputT> TransformEvaluator<InputT> createTransformEvaluator(
- AppliedPTransform<PCollection<InputT>, PCollection<InputT>, Window.Bound<InputT>> transform,
- CommittedBundle<?> inputBundle,
- InProcessEvaluationContext evaluationContext) {
- WindowFn<? super InputT, ?> fn = transform.getTransform().getWindowFn();
- UncommittedBundle<InputT> outputBundle =
- evaluationContext.createBundle(inputBundle, transform.getOutput());
- if (fn == null) {
- return PassthroughTransformEvaluator.create(transform, outputBundle);
- }
- return new WindowIntoEvaluator<>(transform, fn, outputBundle);
- }
-
- private static class WindowIntoEvaluator<InputT> implements TransformEvaluator<InputT> {
- private final AppliedPTransform<PCollection<InputT>, PCollection<InputT>, Window.Bound<InputT>>
- transform;
- private final WindowFn<InputT, ?> windowFn;
- private final UncommittedBundle<InputT> outputBundle;
-
- @SuppressWarnings("unchecked")
- public WindowIntoEvaluator(
- AppliedPTransform<PCollection<InputT>, PCollection<InputT>, Window.Bound<InputT>> transform,
- WindowFn<? super InputT, ?> windowFn,
- UncommittedBundle<InputT> outputBundle) {
- this.outputBundle = outputBundle;
- this.transform = transform;
- // Safe contravariant cast
- this.windowFn = (WindowFn<InputT, ?>) windowFn;
- }
-
- @Override
- public void processElement(WindowedValue<InputT> element) throws Exception {
- Collection<? extends BoundedWindow> windows = assignWindows(windowFn, element);
- outputBundle.add(
- WindowedValue.<InputT>of(
- element.getValue(), element.getTimestamp(), windows, PaneInfo.NO_FIRING));
- }
-
- private <W extends BoundedWindow> Collection<? extends BoundedWindow> assignWindows(
- WindowFn<InputT, W> windowFn, WindowedValue<InputT> element) throws Exception {
- WindowFn<InputT, W>.AssignContext assignContext =
- new InProcessAssignContext<>(windowFn, element);
- Collection<? extends BoundedWindow> windows = windowFn.assignWindows(assignContext);
- return windows;
- }
-
- @Override
- public InProcessTransformResult finishBundle() throws Exception {
- return StepTransformResult.withoutHold(transform).addOutput(outputBundle).build();
- }
- }
-
- private static class InProcessAssignContext<InputT, W extends BoundedWindow>
- extends WindowFn<InputT, W>.AssignContext {
- private final WindowedValue<InputT> value;
-
- public InProcessAssignContext(WindowFn<InputT, W> fn, WindowedValue<InputT> value) {
- fn.super();
- this.value = value;
- }
-
- @Override
- public InputT element() {
- return value.getValue();
- }
-
- @Override
- public Instant timestamp() {
- return value.getTimestamp();
- }
-
- @Override
- public Collection<? extends BoundedWindow> windows() {
- return value.getWindows();
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/AvroIOShardedWriteFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/AvroIOShardedWriteFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/AvroIOShardedWriteFactoryTest.java
deleted file mode 100644
index 43367dd..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/AvroIOShardedWriteFactoryTest.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import static org.hamcrest.Matchers.not;
-import static org.hamcrest.Matchers.theInstance;
-import static org.junit.Assert.assertThat;
-
-import org.apache.beam.sdk.io.AvroIO;
-import org.apache.beam.sdk.io.AvroIOTest;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-
-import org.hamcrest.Matchers;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.io.File;
-
-/**
- * Tests for {@link AvroIOShardedWriteFactory}.
- */
-@RunWith(JUnit4.class)
-public class AvroIOShardedWriteFactoryTest {
-
- @Rule public TemporaryFolder tmp = new TemporaryFolder();
- private AvroIOShardedWriteFactory factory;
-
- @Before
- public void setup() {
- factory = new AvroIOShardedWriteFactory();
- }
-
- @Test
- public void originalWithoutShardingReturnsOriginal() throws Exception {
- File file = tmp.newFile("foo");
- PTransform<PCollection<String>, PDone> original =
- AvroIO.Write.withSchema(String.class).to(file.getAbsolutePath()).withoutSharding();
- PTransform<PCollection<String>, PDone> overridden = factory.override(original);
-
- assertThat(overridden, theInstance(original));
- }
-
- @Test
- public void originalShardingNotSpecifiedReturnsOriginal() throws Exception {
- File file = tmp.newFile("foo");
- PTransform<PCollection<String>, PDone> original =
- AvroIO.Write.withSchema(String.class).to(file.getAbsolutePath());
- PTransform<PCollection<String>, PDone> overridden = factory.override(original);
-
- assertThat(overridden, theInstance(original));
- }
-
- @Test
- public void originalShardedToOneReturnsExplicitlySharded() throws Exception {
- File file = tmp.newFile("foo");
- AvroIO.Write.Bound<String> original =
- AvroIO.Write.withSchema(String.class).to(file.getAbsolutePath()).withNumShards(1);
- PTransform<PCollection<String>, PDone> overridden = factory.override(original);
-
- assertThat(overridden, not(Matchers.<PTransform<PCollection<String>, PDone>>equalTo(original)));
-
- TestPipeline p = TestPipeline.create();
- String[] elems = new String[] {"foo", "bar", "baz"};
- p.apply(Create.<String>of(elems)).apply(overridden);
-
- file.delete();
-
- p.run();
- AvroIOTest.assertTestOutputs(elems, 1, file.getAbsolutePath(), original.getShardNameTemplate());
- }
-
- @Test
- public void originalShardedToManyReturnsExplicitlySharded() throws Exception {
- File file = tmp.newFile("foo");
- AvroIO.Write.Bound<String> original =
- AvroIO.Write.withSchema(String.class).to(file.getAbsolutePath()).withNumShards(3);
- PTransform<PCollection<String>, PDone> overridden = factory.override(original);
-
- assertThat(overridden, not(Matchers.<PTransform<PCollection<String>, PDone>>equalTo(original)));
-
- TestPipeline p = TestPipeline.create();
- String[] elems = new String[] {"foo", "bar", "baz", "spam", "ham", "eggs"};
- p.apply(Create.<String>of(elems)).apply(overridden);
-
- file.delete();
- p.run();
- AvroIOTest.assertTestOutputs(elems, 3, file.getAbsolutePath(), original.getShardNameTemplate());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java
deleted file mode 100644
index 146dd98..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java
+++ /dev/null
@@ -1,290 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.emptyIterable;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.sdk.coders.BigEndianLongCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
-import org.apache.beam.sdk.io.CountingSource;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.Read.Bounded;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollection;
-
-import com.google.common.collect.ImmutableList;
-
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-/**
- * Tests for {@link BoundedReadEvaluatorFactory}.
- */
-@RunWith(JUnit4.class)
-public class BoundedReadEvaluatorFactoryTest {
- private BoundedSource<Long> source;
- private PCollection<Long> longs;
- private TransformEvaluatorFactory factory;
- @Mock private InProcessEvaluationContext context;
- private BundleFactory bundleFactory;
-
- @Before
- public void setup() {
- MockitoAnnotations.initMocks(this);
- source = CountingSource.upTo(10L);
- TestPipeline p = TestPipeline.create();
- longs = p.apply(Read.from(source));
-
- factory = new BoundedReadEvaluatorFactory();
- bundleFactory = InProcessBundleFactory.create();
- }
-
- @Test
- public void boundedSourceInMemoryTransformEvaluatorProducesElements() throws Exception {
- UncommittedBundle<Long> output = bundleFactory.createRootBundle(longs);
- when(context.createRootBundle(longs)).thenReturn(output);
-
- TransformEvaluator<?> evaluator =
- factory.forApplication(longs.getProducingTransformInternal(), null, context);
- InProcessTransformResult result = evaluator.finishBundle();
- assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
- assertThat(
- output.commit(BoundedWindow.TIMESTAMP_MAX_VALUE).getElements(),
- containsInAnyOrder(
- gw(1L), gw(2L), gw(4L), gw(8L), gw(9L), gw(7L), gw(6L), gw(5L), gw(3L), gw(0L)));
- }
-
- /**
- * Demonstrate that acquiring multiple {@link TransformEvaluator TransformEvaluators} for the same
- * {@link Bounded Read.Bounded} application with the same evaluation context only produces the
- * elements once.
- */
- @Test
- public void boundedSourceInMemoryTransformEvaluatorAfterFinishIsEmpty() throws Exception {
- UncommittedBundle<Long> output = bundleFactory.createRootBundle(longs);
- when(context.createRootBundle(longs)).thenReturn(output);
-
- TransformEvaluator<?> evaluator =
- factory.forApplication(longs.getProducingTransformInternal(), null, context);
- InProcessTransformResult result = evaluator.finishBundle();
- assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
- Iterable<? extends WindowedValue<Long>> outputElements =
- output.commit(BoundedWindow.TIMESTAMP_MAX_VALUE).getElements();
- assertThat(
- outputElements,
- containsInAnyOrder(
- gw(1L), gw(2L), gw(4L), gw(8L), gw(9L), gw(7L), gw(6L), gw(5L), gw(3L), gw(0L)));
-
- UncommittedBundle<Long> secondOutput = bundleFactory.createRootBundle(longs);
- when(context.createRootBundle(longs)).thenReturn(secondOutput);
- TransformEvaluator<?> secondEvaluator =
- factory.forApplication(longs.getProducingTransformInternal(), null, context);
- InProcessTransformResult secondResult = secondEvaluator.finishBundle();
- assertThat(secondResult.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
- assertThat(secondResult.getOutputBundles(), emptyIterable());
- assertThat(
- secondOutput.commit(BoundedWindow.TIMESTAMP_MAX_VALUE).getElements(), emptyIterable());
- assertThat(
- outputElements,
- containsInAnyOrder(
- gw(1L), gw(2L), gw(4L), gw(8L), gw(9L), gw(7L), gw(6L), gw(5L), gw(3L), gw(0L)));
- }
-
- /**
- * Demonstrates that acquiring multiple evaluators from the factory are independent, but
- * the elements in the source are only produced once.
- */
- @Test
- public void boundedSourceEvaluatorSimultaneousEvaluations() throws Exception {
- UncommittedBundle<Long> output = bundleFactory.createRootBundle(longs);
- UncommittedBundle<Long> secondOutput = bundleFactory.createRootBundle(longs);
- when(context.createRootBundle(longs)).thenReturn(output).thenReturn(secondOutput);
-
- // create both evaluators before finishing either.
- TransformEvaluator<?> evaluator =
- factory.forApplication(longs.getProducingTransformInternal(), null, context);
- TransformEvaluator<?> secondEvaluator =
- factory.forApplication(longs.getProducingTransformInternal(), null, context);
-
- InProcessTransformResult secondResult = secondEvaluator.finishBundle();
-
- InProcessTransformResult result = evaluator.finishBundle();
- assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
- Iterable<? extends WindowedValue<Long>> outputElements =
- output.commit(BoundedWindow.TIMESTAMP_MAX_VALUE).getElements();
-
- assertThat(
- outputElements,
- containsInAnyOrder(
- gw(1L), gw(2L), gw(4L), gw(8L), gw(9L), gw(7L), gw(6L), gw(5L), gw(3L), gw(0L)));
- assertThat(secondResult.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
- assertThat(secondResult.getOutputBundles(), emptyIterable());
- assertThat(
- secondOutput.commit(BoundedWindow.TIMESTAMP_MAX_VALUE).getElements(), emptyIterable());
- assertThat(
- outputElements,
- containsInAnyOrder(
- gw(1L), gw(2L), gw(4L), gw(8L), gw(9L), gw(7L), gw(6L), gw(5L), gw(3L), gw(0L)));
- }
-
- @Test
- public void boundedSourceEvaluatorClosesReader() throws Exception {
- TestSource<Long> source = new TestSource<>(BigEndianLongCoder.of(), 1L, 2L, 3L);
-
- TestPipeline p = TestPipeline.create();
- PCollection<Long> pcollection = p.apply(Read.from(source));
- AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal();
-
- UncommittedBundle<Long> output = bundleFactory.createRootBundle(pcollection);
- when(context.createRootBundle(pcollection)).thenReturn(output);
-
- TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null, context);
- evaluator.finishBundle();
- CommittedBundle<Long> committed = output.commit(Instant.now());
- assertThat(committed.getElements(), containsInAnyOrder(gw(2L), gw(3L), gw(1L)));
- assertThat(TestSource.readerClosed, is(true));
- }
-
- @Test
- public void boundedSourceEvaluatorNoElementsClosesReader() throws Exception {
- TestSource<Long> source = new TestSource<>(BigEndianLongCoder.of());
-
- TestPipeline p = TestPipeline.create();
- PCollection<Long> pcollection = p.apply(Read.from(source));
- AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal();
-
- UncommittedBundle<Long> output = bundleFactory.createRootBundle(pcollection);
- when(context.createRootBundle(pcollection)).thenReturn(output);
-
- TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null, context);
- evaluator.finishBundle();
- CommittedBundle<Long> committed = output.commit(Instant.now());
- assertThat(committed.getElements(), emptyIterable());
- assertThat(TestSource.readerClosed, is(true));
- }
-
- private static class TestSource<T> extends BoundedSource<T> {
- private static boolean readerClosed;
- private final Coder<T> coder;
- private final T[] elems;
-
- public TestSource(Coder<T> coder, T... elems) {
- this.elems = elems;
- this.coder = coder;
- readerClosed = false;
- }
-
- @Override
- public List<? extends BoundedSource<T>> splitIntoBundles(
- long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
- return ImmutableList.of(this);
- }
-
- @Override
- public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
- return 0;
- }
-
- @Override
- public boolean producesSortedKeys(PipelineOptions options) throws Exception {
- return false;
- }
-
- @Override
- public BoundedSource.BoundedReader<T> createReader(PipelineOptions options) throws IOException {
- return new TestReader<>(this, elems);
- }
-
- @Override
- public void validate() {
- }
-
- @Override
- public Coder<T> getDefaultOutputCoder() {
- return coder;
- }
- }
-
- private static class TestReader<T> extends BoundedReader<T> {
- private final BoundedSource<T> source;
- private final List<T> elems;
- private int index;
-
- public TestReader(BoundedSource<T> source, T... elems) {
- this.source = source;
- this.elems = Arrays.asList(elems);
- this.index = -1;
- }
-
- @Override
- public BoundedSource<T> getCurrentSource() {
- return source;
- }
-
- @Override
- public boolean start() throws IOException {
- return advance();
- }
-
- @Override
- public boolean advance() throws IOException {
- if (elems.size() > index + 1) {
- index++;
- return true;
- }
- return false;
- }
-
- @Override
- public T getCurrent() throws NoSuchElementException {
- return elems.get(index);
- }
-
- @Override
- public void close() throws IOException {
- TestSource.readerClosed = true;
- }
- }
-
- private static WindowedValue<Long> gw(Long elem) {
- return WindowedValue.valueInGlobalWindow(elem);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/CommittedResultTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/CommittedResultTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/CommittedResultTest.java
deleted file mode 100644
index c888a65..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/CommittedResultTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.runners.inprocess;
-
-import static org.junit.Assert.assertThat;
-
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-
-import com.google.common.collect.ImmutableList;
-
-import org.hamcrest.Matchers;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * Tests for {@link CommittedResult}.
- */
-@RunWith(JUnit4.class)
-public class CommittedResultTest implements Serializable {
- private transient TestPipeline p = TestPipeline.create();
- private transient AppliedPTransform<?, ?, ?> transform =
- AppliedPTransform.of("foo", p.begin(), PDone.in(p), new PTransform<PBegin, PDone>() {
- });
- private transient BundleFactory bundleFactory = InProcessBundleFactory.create();
-
- @Test
- public void getTransformExtractsFromResult() {
- CommittedResult result =
- CommittedResult.create(StepTransformResult.withoutHold(transform).build(),
- Collections.<InProcessPipelineRunner.CommittedBundle<?>>emptyList());
-
- assertThat(result.getTransform(), Matchers.<AppliedPTransform<?, ?, ?>>equalTo(transform));
- }
-
- @Test
- public void getOutputsEqualInput() {
- List<? extends InProcessPipelineRunner.CommittedBundle<?>> outputs =
- ImmutableList.of(bundleFactory.createRootBundle(PCollection.createPrimitiveOutputInternal(p,
- WindowingStrategy.globalDefault(),
- PCollection.IsBounded.BOUNDED)).commit(Instant.now()),
- bundleFactory.createRootBundle(PCollection.createPrimitiveOutputInternal(p,
- WindowingStrategy.globalDefault(),
- PCollection.IsBounded.UNBOUNDED)).commit(Instant.now()));
- CommittedResult result =
- CommittedResult.create(StepTransformResult.withoutHold(transform).build(), outputs);
-
- assertThat(result.getOutputs(), Matchers.containsInAnyOrder(outputs.toArray()));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ConsumerTrackingPipelineVisitorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ConsumerTrackingPipelineVisitorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ConsumerTrackingPipelineVisitorTest.java
deleted file mode 100644
index aef4845..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ConsumerTrackingPipelineVisitorTest.java
+++ /dev/null
@@ -1,272 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import static org.hamcrest.Matchers.emptyIterable;
-import static org.junit.Assert.assertThat;
-
-import org.apache.beam.sdk.io.CountingInput;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionList;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.PValue;
-
-import org.hamcrest.Matchers;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.io.Serializable;
-import java.util.List;
-
-/**
- * Tests for {@link ConsumerTrackingPipelineVisitor}.
- */
-@RunWith(JUnit4.class)
-public class ConsumerTrackingPipelineVisitorTest implements Serializable {
- @Rule public transient ExpectedException thrown = ExpectedException.none();
-
- private transient TestPipeline p = TestPipeline.create();
- private transient ConsumerTrackingPipelineVisitor visitor = new ConsumerTrackingPipelineVisitor();
-
- @Test
- public void getViewsReturnsViews() {
- PCollectionView<List<String>> listView =
- p.apply("listCreate", Create.of("foo", "bar"))
- .apply(
- ParDo.of(
- new DoFn<String, String>() {
- @Override
- public void processElement(DoFn<String, String>.ProcessContext c)
- throws Exception {
- c.output(Integer.toString(c.element().length()));
- }
- }))
- .apply(View.<String>asList());
- PCollectionView<Object> singletonView =
- p.apply("singletonCreate", Create.<Object>of(1, 2, 3)).apply(View.<Object>asSingleton());
- p.traverseTopologically(visitor);
- assertThat(
- visitor.getViews(),
- Matchers.<PCollectionView<?>>containsInAnyOrder(listView, singletonView));
- }
-
- @Test
- public void getRootTransformsContainsPBegins() {
- PCollection<String> created = p.apply(Create.of("foo", "bar"));
- PCollection<Long> counted = p.apply(CountingInput.upTo(1234L));
- PCollection<Long> unCounted = p.apply(CountingInput.unbounded());
- p.traverseTopologically(visitor);
- assertThat(
- visitor.getRootTransforms(),
- Matchers.<AppliedPTransform<?, ?, ?>>containsInAnyOrder(
- created.getProducingTransformInternal(),
- counted.getProducingTransformInternal(),
- unCounted.getProducingTransformInternal()));
- }
-
- @Test
- public void getRootTransformsContainsEmptyFlatten() {
- PCollection<String> empty =
- PCollectionList.<String>empty(p).apply(Flatten.<String>pCollections());
- p.traverseTopologically(visitor);
- assertThat(
- visitor.getRootTransforms(),
- Matchers.<AppliedPTransform<?, ?, ?>>containsInAnyOrder(
- empty.getProducingTransformInternal()));
- }
-
- @Test
- public void getValueToConsumersSucceeds() {
- PCollection<String> created = p.apply(Create.of("1", "2", "3"));
- PCollection<String> transformed =
- created.apply(
- ParDo.of(
- new DoFn<String, String>() {
- @Override
- public void processElement(DoFn<String, String>.ProcessContext c)
- throws Exception {
- c.output(Integer.toString(c.element().length()));
- }
- }));
-
- PCollection<String> flattened =
- PCollectionList.of(created).and(transformed).apply(Flatten.<String>pCollections());
-
- p.traverseTopologically(visitor);
-
- assertThat(
- visitor.getValueToConsumers().get(created),
- Matchers.<AppliedPTransform<?, ?, ?>>containsInAnyOrder(
- transformed.getProducingTransformInternal(),
- flattened.getProducingTransformInternal()));
- assertThat(
- visitor.getValueToConsumers().get(transformed),
- Matchers.<AppliedPTransform<?, ?, ?>>containsInAnyOrder(
- flattened.getProducingTransformInternal()));
- assertThat(visitor.getValueToConsumers().get(flattened), emptyIterable());
- }
-
- @Test
- public void getUnfinalizedPValuesContainsDanglingOutputs() {
- PCollection<String> created = p.apply(Create.of("1", "2", "3"));
- PCollection<String> transformed =
- created.apply(
- ParDo.of(
- new DoFn<String, String>() {
- @Override
- public void processElement(DoFn<String, String>.ProcessContext c)
- throws Exception {
- c.output(Integer.toString(c.element().length()));
- }
- }));
-
- p.traverseTopologically(visitor);
- assertThat(visitor.getUnfinalizedPValues(), Matchers.<PValue>contains(transformed));
- }
-
- @Test
- public void getUnfinalizedPValuesEmpty() {
- p.apply(Create.of("1", "2", "3"))
- .apply(
- ParDo.of(
- new DoFn<String, String>() {
- @Override
- public void processElement(DoFn<String, String>.ProcessContext c)
- throws Exception {
- c.output(Integer.toString(c.element().length()));
- }
- }))
- .apply(
- new PTransform<PInput, PDone>() {
- @Override
- public PDone apply(PInput input) {
- return PDone.in(input.getPipeline());
- }
- });
-
- p.traverseTopologically(visitor);
- assertThat(visitor.getUnfinalizedPValues(), emptyIterable());
- }
-
- @Test
- public void getStepNamesContainsAllTransforms() {
- PCollection<String> created = p.apply(Create.of("1", "2", "3"));
- PCollection<String> transformed =
- created.apply(
- ParDo.of(
- new DoFn<String, String>() {
- @Override
- public void processElement(DoFn<String, String>.ProcessContext c)
- throws Exception {
- c.output(Integer.toString(c.element().length()));
- }
- }));
- PDone finished =
- transformed.apply(
- new PTransform<PInput, PDone>() {
- @Override
- public PDone apply(PInput input) {
- return PDone.in(input.getPipeline());
- }
- });
-
- p.traverseTopologically(visitor);
- assertThat(
- visitor.getStepNames(),
- Matchers.<AppliedPTransform<?, ?, ?>, String>hasEntry(
- created.getProducingTransformInternal(), "s0"));
- assertThat(
- visitor.getStepNames(),
- Matchers.<AppliedPTransform<?, ?, ?>, String>hasEntry(
- transformed.getProducingTransformInternal(), "s1"));
- assertThat(
- visitor.getStepNames(),
- Matchers.<AppliedPTransform<?, ?, ?>, String>hasEntry(
- finished.getProducingTransformInternal(), "s2"));
- }
-
- @Test
- public void traverseMultipleTimesThrows() {
- p.apply(Create.of(1, 2, 3));
-
- p.traverseTopologically(visitor);
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage(ConsumerTrackingPipelineVisitor.class.getSimpleName());
- thrown.expectMessage("is finalized");
- p.traverseTopologically(visitor);
- }
-
- @Test
- public void traverseIndependentPathsSucceeds() {
- p.apply("left", Create.of(1, 2, 3));
- p.apply("right", Create.of("foo", "bar", "baz"));
-
- p.traverseTopologically(visitor);
- }
-
- @Test
- public void getRootTransformsWithoutVisitingThrows() {
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage("completely traversed");
- thrown.expectMessage("getRootTransforms");
- visitor.getRootTransforms();
- }
- @Test
- public void getStepNamesWithoutVisitingThrows() {
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage("completely traversed");
- thrown.expectMessage("getStepNames");
- visitor.getStepNames();
- }
- @Test
- public void getUnfinalizedPValuesWithoutVisitingThrows() {
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage("completely traversed");
- thrown.expectMessage("getUnfinalizedPValues");
- visitor.getUnfinalizedPValues();
- }
-
- @Test
- public void getValueToConsumersWithoutVisitingThrows() {
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage("completely traversed");
- thrown.expectMessage("getValueToConsumers");
- visitor.getValueToConsumers();
- }
-
- @Test
- public void getViewsWithoutVisitingThrows() {
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage("completely traversed");
- thrown.expectMessage("getViews");
- visitor.getViews();
- }
-}