You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/05/03 21:12:19 UTC
[3/3] beam git commit: Reduce visibility of DirectRunner classes
Reduce visibility of DirectRunner classes
Move inner classes of the DirectRunner to reduce total API Surface.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9edd8599
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9edd8599
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9edd8599
Branch: refs/heads/master
Commit: 9edd8599c28228cd5d7d5df1084f7e63684964d8
Parents: b263cb7
Author: Thomas Groh <tg...@google.com>
Authored: Wed Apr 26 13:38:37 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed May 3 14:12:05 2017 -0700
----------------------------------------------------------------------
.../direct/AbstractModelEnforcement.java | 1 -
.../direct/BoundedReadEvaluatorFactory.java | 2 -
.../beam/runners/direct/BundleFactory.java | 3 +-
.../org/apache/beam/runners/direct/Clock.java | 2 +-
.../runners/direct/CloningBundleFactory.java | 2 -
.../beam/runners/direct/CommittedBundle.java | 82 +++++++++++++
.../beam/runners/direct/CommittedResult.java | 1 -
.../beam/runners/direct/CompletionCallback.java | 1 -
.../CopyOnAccessInMemoryStateInternals.java | 2 +-
.../apache/beam/runners/direct/DirectGraph.java | 12 +-
.../beam/runners/direct/DirectMetrics.java | 1 -
.../beam/runners/direct/DirectRunner.java | 122 ++-----------------
.../beam/runners/direct/EmptyInputProvider.java | 1 -
.../runners/direct/EmptyTransformEvaluator.java | 50 --------
.../beam/runners/direct/EvaluationContext.java | 3 -
.../runners/direct/ExecutorServiceFactory.java | 2 +-
.../direct/ExecutorServiceParallelExecutor.java | 1 -
.../runners/direct/FlattenEvaluatorFactory.java | 2 -
.../GroupAlsoByWindowEvaluatorFactory.java | 2 -
.../direct/GroupByKeyOnlyEvaluatorFactory.java | 2 -
.../ImmutabilityCheckingBundleFactory.java | 2 -
.../direct/ImmutabilityEnforcementFactory.java | 1 -
.../direct/ImmutableListBundleFactory.java | 2 -
.../beam/runners/direct/ModelEnforcement.java | 10 +-
.../runners/direct/ModelEnforcementFactory.java | 3 +-
.../beam/runners/direct/NanosOffsetClock.java | 2 +-
.../runners/direct/PCollectionViewWriter.java | 34 ++++++
.../beam/runners/direct/ParDoEvaluator.java | 1 -
.../runners/direct/ParDoEvaluatorFactory.java | 1 -
.../direct/PassthroughTransformEvaluator.java | 1 -
.../beam/runners/direct/PipelineExecutor.java | 1 -
.../beam/runners/direct/RootInputProvider.java | 1 -
.../runners/direct/RootProviderRegistry.java | 1 -
...littableProcessElementsEvaluatorFactory.java | 1 -
.../direct/StatefulParDoEvaluatorFactory.java | 1 -
.../runners/direct/StepTransformResult.java | 3 +-
.../direct/TestStreamEvaluatorFactory.java | 2 -
.../beam/runners/direct/TransformEvaluator.java | 3 +-
.../direct/TransformEvaluatorFactory.java | 6 +-
.../direct/TransformEvaluatorRegistry.java | 1 -
.../beam/runners/direct/TransformExecutor.java | 1 -
.../beam/runners/direct/TransformResult.java | 3 +-
.../direct/UnboundedReadEvaluatorFactory.java | 2 -
.../beam/runners/direct/UncommittedBundle.java | 57 +++++++++
.../runners/direct/ViewEvaluatorFactory.java | 3 +-
.../beam/runners/direct/WatermarkManager.java | 3 +-
.../runners/direct/WindowEvaluatorFactory.java | 2 -
.../direct/BoundedReadEvaluatorFactoryTest.java | 2 -
.../direct/CloningBundleFactoryTest.java | 2 -
.../runners/direct/CommittedResultTest.java | 12 +-
.../beam/runners/direct/DirectMetricsTest.java | 1 -
.../runners/direct/EvaluationContextTest.java | 3 -
.../direct/FlattenEvaluatorFactoryTest.java | 2 -
.../direct/GroupByKeyEvaluatorFactoryTest.java | 2 -
.../GroupByKeyOnlyEvaluatorFactoryTest.java | 2 -
.../ImmutabilityCheckingBundleFactoryTest.java | 2 -
.../ImmutabilityEnforcementFactoryTest.java | 1 -
.../direct/ImmutableListBundleFactoryTest.java | 2 -
.../beam/runners/direct/ParDoEvaluatorTest.java | 1 -
.../StatefulParDoEvaluatorFactoryTest.java | 2 -
.../runners/direct/StepTransformResultTest.java | 1 -
.../direct/TestStreamEvaluatorFactoryTest.java | 1 -
.../runners/direct/TransformExecutorTest.java | 1 -
.../UnboundedReadEvaluatorFactoryTest.java | 2 -
.../direct/ViewEvaluatorFactoryTest.java | 2 -
.../runners/direct/WatermarkManagerTest.java | 2 -
.../direct/WindowEvaluatorFactoryTest.java | 2 -
67 files changed, 214 insertions(+), 273 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java
index f09164b..40faf5a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.runners.direct;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.util.WindowedValue;
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
index 0c2afe8..26f9851 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
@@ -31,8 +31,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import javax.annotation.Nullable;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.runners.direct.StepTransformResult.Builder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java
index b1cb9b1..e39b5d8 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java
@@ -18,14 +18,13 @@
package org.apache.beam.runners.direct;
import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
/**
* A factory that creates {@link UncommittedBundle UncommittedBundles}.
*/
-public interface BundleFactory {
+interface BundleFactory {
/**
* Create an {@link UncommittedBundle} from an empty input. Elements added to the bundle do not
* belong to a {@link PCollection}.
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/Clock.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/Clock.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/Clock.java
index 88f8aab..1a93c62 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/Clock.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/Clock.java
@@ -22,7 +22,7 @@ import org.joda.time.Instant;
/**
* Access to the current time.
*/
-public interface Clock {
+interface Clock {
/**
* Returns the current time as an {@link Instant}.
*/
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CloningBundleFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CloningBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CloningBundleFactory.java
index 33241e3..68b059f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CloningBundleFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CloningBundleFactory.java
@@ -18,8 +18,6 @@
package org.apache.beam.runners.direct;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.util.CoderUtils;
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedBundle.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedBundle.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedBundle.java
new file mode 100644
index 0000000..79a96fe
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedBundle.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct;
+
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Instant;
+
+/**
+ * Part of a {@link PCollection}. Elements are output to an {@link UncommittedBundle}, which will
+ * eventually committed. Committed elements are executed by the {@link PTransform PTransforms}
+ * that consume the {@link PCollection} this bundle is
+ * a part of at a later point.
+ * @param <T> the type of elements contained within this bundle
+ */
+interface CommittedBundle<T> {
+ /**
+ * Returns the PCollection that the elements of this bundle belong to.
+ */
+ @Nullable
+ PCollection<T> getPCollection();
+
+ /**
+ * Returns the key that was output in the most recent {@link GroupByKey} in the
+ * execution of this bundle.
+ */
+ StructuralKey<?> getKey();
+
+ /**
+ * Returns an {@link Iterable} containing all of the elements that have been added to this
+ * {@link CommittedBundle}.
+ */
+ Iterable<WindowedValue<T>> getElements();
+
+ /**
+ * Returns the minimum timestamp among all of the elements of this {@link CommittedBundle}.
+ */
+ Instant getMinTimestamp();
+
+ /**
+ * Returns the processing time output watermark at the time the producing {@link PTransform}
+ * committed this bundle. Downstream synchronized processing time watermarks cannot progress
+ * past this point before consuming this bundle.
+ *
+ * <p>This value is no greater than the earliest incomplete processing time or synchronized
+ * processing time {@link TimerData timer} at the time this bundle was committed, including any
+ * timers that fired to produce this bundle.
+ */
+ Instant getSynchronizedProcessingOutputWatermark();
+
+ /**
+ * Return a new {@link CommittedBundle} that is like this one, except calls to
+ * {@link #getElements()} will return the provided elements. This bundle is unchanged.
+ *
+ * <p>The value of the {@link #getSynchronizedProcessingOutputWatermark() synchronized
+ * processing output watermark} of the returned {@link CommittedBundle} is equal to the value
+ * returned from the current bundle. This is used to ensure a {@link PTransform} that could not
+ * complete processing on input elements properly holds the synchronized processing time to the
+ * appropriate value.
+ */
+ CommittedBundle<T> withElements(Iterable<WindowedValue<T>> elements);
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
index 4db7e18..99abdd3 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
@@ -21,7 +21,6 @@ package org.apache.beam.runners.direct;
import com.google.auto.value.AutoValue;
import java.util.Set;
import javax.annotation.Nullable;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
index 766259d..7b5ef4b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.runners.direct;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.transforms.AppliedPTransform;
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
index ef3a053..d2af93c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
@@ -60,7 +60,7 @@ import org.joda.time.Instant;
* of {@link InMemoryState}. Whenever state that exists in the underlying {@link StateTable} is
* accessed, an independent copy will be created within this table.
*/
-public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals {
+class CopyOnAccessInMemoryStateInternals<K> implements StateInternals {
private final CopyOnAccessInMemoryStateTable table;
private K key;
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraph.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraph.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraph.java
index f208f6e..e163d83 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraph.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraph.java
@@ -63,27 +63,27 @@ class DirectGraph {
this.stepNames = stepNames;
}
- public AppliedPTransform<?, ?, ?> getProducer(PValue produced) {
+ AppliedPTransform<?, ?, ?> getProducer(PValue produced) {
return producers.get(produced);
}
- public List<AppliedPTransform<?, ?, ?>> getPrimitiveConsumers(PValue consumed) {
+ List<AppliedPTransform<?, ?, ?>> getPrimitiveConsumers(PValue consumed) {
return primitiveConsumers.get(consumed);
}
- public Set<AppliedPTransform<?, ?, ?>> getRootTransforms() {
+ Set<AppliedPTransform<?, ?, ?>> getRootTransforms() {
return rootTransforms;
}
- public Set<PCollectionView<?>> getViews() {
+ Set<PCollectionView<?>> getViews() {
return views;
}
- public String getStepName(AppliedPTransform<?, ?, ?> step) {
+ String getStepName(AppliedPTransform<?, ?, ?> step) {
return stepNames.get(step);
}
- public Collection<AppliedPTransform<?, ?, ?>> getPrimitiveTransforms() {
+ Collection<AppliedPTransform<?, ?, ?>> getPrimitiveTransforms() {
return stepNames.keySet();
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
index fb126fb..b6ca492 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
@@ -30,7 +30,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.concurrent.GuardedBy;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.metrics.DistributionData;
import org.apache.beam.sdk.metrics.DistributionResult;
import org.apache.beam.sdk.metrics.GaugeData;
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index e063bc3..c6168b3e 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -27,9 +27,7 @@ import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import javax.annotation.Nullable;
import org.apache.beam.runners.core.SplittableParDo.GBKIntoKeyedWorkItems;
-import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.runners.core.construction.PTransformMatchers;
import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
import org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory;
@@ -49,119 +47,14 @@ import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
import org.apache.beam.sdk.util.UserCodeException;
-import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
import org.joda.time.Duration;
-import org.joda.time.Instant;
/**
* An In-Memory implementation of the Dataflow Programming Model. Supports Unbounded
* {@link PCollection PCollections}.
*/
public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
- /**
- * Part of a {@link PCollection}. Elements are output to a bundle, which will cause them to be
- * executed by {@link PTransform PTransforms} that consume the {@link PCollection} this bundle is
- * a part of at a later point. This is an uncommitted bundle and can have elements added to it.
- *
- * @param <T> the type of elements that can be added to this bundle
- */
- interface UncommittedBundle<T> {
- /**
- * Returns the PCollection that the elements of this {@link UncommittedBundle} belong to.
- */
- @Nullable
- PCollection<T> getPCollection();
-
- /**
- * Outputs an element to this bundle.
- *
- * @param element the element to add to this bundle
- * @return this bundle
- */
- UncommittedBundle<T> add(WindowedValue<T> element);
-
- /**
- * Commits this {@link UncommittedBundle}, returning an immutable {@link CommittedBundle}
- * containing all of the elements that were added to it. The {@link #add(WindowedValue)} method
- * will throw an {@link IllegalStateException} if called after a call to commit.
- * @param synchronizedProcessingTime the synchronized processing time at which this bundle was
- * committed
- */
- CommittedBundle<T> commit(Instant synchronizedProcessingTime);
- }
-
- /**
- * Part of a {@link PCollection}. Elements are output to an {@link UncommittedBundle}, which will
- * eventually committed. Committed elements are executed by the {@link PTransform PTransforms}
- * that consume the {@link PCollection} this bundle is
- * a part of at a later point.
- * @param <T> the type of elements contained within this bundle
- */
- interface CommittedBundle<T> {
- /**
- * Returns the PCollection that the elements of this bundle belong to.
- */
- @Nullable
- PCollection<T> getPCollection();
-
- /**
- * Returns the key that was output in the most recent {@link GroupByKey} in the
- * execution of this bundle.
- */
- StructuralKey<?> getKey();
-
- /**
- * Returns an {@link Iterable} containing all of the elements that have been added to this
- * {@link CommittedBundle}.
- */
- Iterable<WindowedValue<T>> getElements();
-
- /**
- * Returns the minimum timestamp among all of the elements of this {@link CommittedBundle}.
- */
- Instant getMinTimestamp();
-
- /**
- * Returns the processing time output watermark at the time the producing {@link PTransform}
- * committed this bundle. Downstream synchronized processing time watermarks cannot progress
- * past this point before consuming this bundle.
- *
- * <p>This value is no greater than the earliest incomplete processing time or synchronized
- * processing time {@link TimerData timer} at the time this bundle was committed, including any
- * timers that fired to produce this bundle.
- */
- Instant getSynchronizedProcessingOutputWatermark();
-
- /**
- * Return a new {@link CommittedBundle} that is like this one, except calls to
- * {@link #getElements()} will return the provided elements. This bundle is unchanged.
- *
- * <p>The value of the {@link #getSynchronizedProcessingOutputWatermark() synchronized
- * processing output watermark} of the returned {@link CommittedBundle} is equal to the value
- * returned from the current bundle. This is used to ensure a {@link PTransform} that could not
- * complete processing on input elements properly holds the synchronized processing time to the
- * appropriate value.
- */
- CommittedBundle<T> withElements(Iterable<WindowedValue<T>> elements);
- }
-
- /**
- * A {@link PCollectionViewWriter} is responsible for writing contents of a {@link PCollection} to
- * a storage mechanism that can be read from while constructing a {@link PCollectionView}.
- *
- * @param <ElemT> the type of elements the input {@link PCollection} contains.
- * @param <ViewT> the type of the PCollectionView this writer writes to.
- */
- interface PCollectionViewWriter<ElemT, ViewT> {
- void add(Iterable<WindowedValue<ElemT>> values);
- }
-
- /** The set of {@link PTransform PTransforms} that execute a UDF. Useful for some enforcements. */
- private static final Set<Class<? extends PTransform>> CONTAINS_UDF =
- ImmutableSet.of(
- Read.Bounded.class, Read.Unbounded.class, ParDo.SingleOutput.class, MultiOutput.class);
enum Enforcement {
ENCODABILITY {
@@ -177,11 +70,18 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
}
};
+ /**
+ * The set of {@link PTransform PTransforms} that execute a UDF. Useful for some enforcements.
+ */
+ private static final Set<Class<? extends PTransform>> CONTAINS_UDF =
+ ImmutableSet.of(
+ Read.Bounded.class, Read.Unbounded.class, ParDo.SingleOutput.class, MultiOutput.class);
+
public abstract boolean appliesTo(PCollection<?> collection, DirectGraph graph);
////////////////////////////////////////////////////////////////////////////////////////////////
// Utilities for creating enforcements
- public static Set<Enforcement> enabled(DirectOptions options) {
+ static Set<Enforcement> enabled(DirectOptions options) {
EnumSet<Enforcement> enabled = EnumSet.noneOf(Enforcement.class);
if (options.isEnforceEncodability()) {
enabled.add(ENCODABILITY);
@@ -192,7 +92,8 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
return Collections.unmodifiableSet(enabled);
}
- public static BundleFactory bundleFactoryFor(Set<Enforcement> enforcements, DirectGraph graph) {
+ static BundleFactory bundleFactoryFor(
+ Set<Enforcement> enforcements, DirectGraph graph) {
BundleFactory bundleFactory =
enforcements.contains(Enforcement.ENCODABILITY)
? CloningBundleFactory.create()
@@ -430,7 +331,4 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
return NanosOffsetClock.create();
}
}
-
- private static class ComplexParDoMatcher {
- }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
index 98d4a64..396cdee 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
@@ -19,7 +19,6 @@ package org.apache.beam.runners.direct;
import java.util.Collection;
import java.util.Collections;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.values.PCollection;
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyTransformEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyTransformEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyTransformEvaluator.java
deleted file mode 100644
index 85e5e70..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyTransformEvaluator.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowedValue;
-
-/**
- * A {@link TransformEvaluator} that ignores all input and produces no output. The result of
- * invoking {@link #finishBundle()} on this evaluator is to return an
- * {@link TransformResult} with no elements and a timestamp hold equal to
- * {@link BoundedWindow#TIMESTAMP_MIN_VALUE}. Because the result contains no elements, this hold
- * will not affect the watermark.
- */
-final class EmptyTransformEvaluator<T> implements TransformEvaluator<T> {
- public static <T> TransformEvaluator<T> create(AppliedPTransform<?, ?, ?> transform) {
- return new EmptyTransformEvaluator<T>(transform);
- }
-
- private final AppliedPTransform<?, ?, ?> transform;
-
- private EmptyTransformEvaluator(AppliedPTransform<?, ?, ?> transform) {
- this.transform = transform;
- }
-
- @Override
- public void processElement(WindowedValue<T> element) throws Exception {}
-
- @Override
- public TransformResult<T> finishBundle() throws Exception {
- return StepTransformResult.<T>withHold(transform, BoundedWindow.TIMESTAMP_MIN_VALUE)
- .build();
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
index 93d6f96..3cdf351 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
@@ -35,9 +35,6 @@ import org.apache.beam.runners.core.ExecutionContext;
import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.runners.direct.CommittedResult.OutputType;
import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.runners.direct.DirectRunner.PCollectionViewWriter;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.runners.direct.WatermarkManager.FiredTimers;
import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks;
import org.apache.beam.sdk.Pipeline;
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceFactory.java
index 91dc258..f9e9fa9 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceFactory.java
@@ -25,7 +25,7 @@ import java.util.concurrent.ExecutorService;
* another (e.g., if any executor is shut down the remaining executors should continue to process
* work).
*/
-public interface ExecutorServiceFactory {
+interface ExecutorServiceFactory {
/**
* Create a new {@link ExecutorService}.
*/
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index c802c58..4da62d5 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -47,7 +47,6 @@ import javax.annotation.Nullable;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.KeyedWorkItems;
import org.apache.beam.runners.core.TimerInternals.TimerData;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.runners.direct.WatermarkManager.FiredTimers;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult.State;
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
index 7c6d2a1..341ea4b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
@@ -18,8 +18,6 @@
package org.apache.beam.runners.direct;
import com.google.common.collect.Iterables;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.Flatten.PCollections;
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index d006553..d00e408 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -39,8 +39,6 @@ import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachines;
import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
index ac0b14f..1ea8e76 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
@@ -31,8 +31,6 @@ import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.KeyedWorkItems;
import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.runners.direct.StepTransformResult.Builder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java
index 8d77e25..9aabddc 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java
@@ -21,9 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.SetMultimap;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.runners.direct.DirectRunner.Enforcement;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.transforms.DoFn;
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java
index 85fc374..8880af9 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java
@@ -19,7 +19,6 @@ package org.apache.beam.runners.direct;
import java.util.IdentityHashMap;
import java.util.Map;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.transforms.AppliedPTransform;
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
index 36264ee..73734d0 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
@@ -23,8 +23,6 @@ import static com.google.common.base.Preconditions.checkState;
import com.google.auto.value.AutoValue;
import com.google.common.collect.ImmutableList;
import javax.annotation.Nullable;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java
index 96dbc2b..d2e9424 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.values.PCollection;
*
* <p>ModelEnforcement is performed on a per-element and per-bundle basis. The
* {@link ModelEnforcement} is provided with the input bundle as part of
- * {@link ModelEnforcementFactory#forBundle(DirectRunner.CommittedBundle, AppliedPTransform)} each
+ * {@link ModelEnforcementFactory#forBundle(CommittedBundle, AppliedPTransform)} each
* element before and after that element is provided to an underlying {@link TransformEvaluator},
* and the output {@link TransformResult} and committed output bundles after the
* {@link TransformEvaluator} has completed.
@@ -37,7 +37,7 @@ import org.apache.beam.sdk.values.PCollection;
* (such as the immutability of input elements). When the element is output or the bundle is
* completed, the required conditions can be enforced across all elements.
*/
-public interface ModelEnforcement<T> {
+interface ModelEnforcement<T> {
/**
* Called before a call to {@link TransformEvaluator#processElement(WindowedValue)} on the
* provided {@link WindowedValue}.
@@ -53,10 +53,10 @@ public interface ModelEnforcement<T> {
/**
* Called after a bundle has been completed and {@link TransformEvaluator#finishBundle()} has been
* called, producing the provided {@link TransformResult} and
- * {@link DirectRunner.CommittedBundle output bundles}.
+ * {@link CommittedBundle output bundles}.
*/
void afterFinish(
- DirectRunner.CommittedBundle<T> input,
+ CommittedBundle<T> input,
TransformResult<T> result,
- Iterable<? extends DirectRunner.CommittedBundle<?>> outputs);
+ Iterable<? extends CommittedBundle<?>> outputs);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcementFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcementFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcementFactory.java
index e0bbfcb..30f1d20 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcementFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcementFactory.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.runners.direct;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.transforms.AppliedPTransform;
/**
@@ -25,6 +24,6 @@ import org.apache.beam.sdk.transforms.AppliedPTransform;
* {@link CommittedBundle bundle}. {@link ModelEnforcement} instances are created before the
* {@link TransformEvaluator} is created.
*/
-public interface ModelEnforcementFactory {
+interface ModelEnforcementFactory {
<T> ModelEnforcement<T> forBundle(CommittedBundle<T> input, AppliedPTransform<?, ?, ?> consumer);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java
index 5a2b18d..5e86f4b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java
@@ -23,7 +23,7 @@ import org.joda.time.Instant;
/**
* A {@link Clock} that uses {@link System#nanoTime()} to track the progress of time.
*/
-public class NanosOffsetClock implements Clock {
+class NanosOffsetClock implements Clock {
private final long baseMillis;
private final long nanosAtBaseMillis;
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PCollectionViewWriter.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PCollectionViewWriter.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PCollectionViewWriter.java
new file mode 100644
index 0000000..10c6b74
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PCollectionViewWriter.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * A {@link PCollectionViewWriter} is responsible for writing contents of a {@link PCollection} to
+ * a storage mechanism that can be read from while constructing a {@link PCollectionView}.
+ *
+ * @param <ElemT> the type of elements the input {@link PCollection} contains.
+ * @param <ViewT> the type of the PCollectionView this writer writes to.
+ */
+interface PCollectionViewWriter<ElemT, ViewT> {
+ void add(Iterable<WindowedValue<ElemT>> values);
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index 2ea8a91..a3a345f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -29,7 +29,6 @@ import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.DoFn;
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
index b00c2b6..39595d8 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
@@ -25,7 +25,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java
index 153af65..c57932c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.runners.direct;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.util.WindowedValue;
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java
index 82f59a7..07212c7 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java
@@ -18,7 +18,6 @@
package org.apache.beam.runners.direct;
import java.util.Collection;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.transforms.AppliedPTransform;
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootInputProvider.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootInputProvider.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootInputProvider.java
index c3df103..88e0769 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootInputProvider.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootInputProvider.java
@@ -19,7 +19,6 @@
package org.apache.beam.runners.direct;
import java.util.Collection;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java
index eb9492c..b06a41c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java
@@ -22,7 +22,6 @@ import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.collect.ImmutableMap;
import java.util.Collection;
import java.util.Map;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Flatten.PCollections;
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
index 5f6b4f7..fb3a962 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
@@ -33,7 +33,6 @@ import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateInternalsFactory;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.TimerInternalsFactory;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.DoFn;
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
index 7cf3840..f278e08 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
@@ -37,7 +37,6 @@ import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.runners.direct.ParDoMultiOverrideFactory.StatefulParDo;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.AppliedPTransform;
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
index 2a2ccab..7e5f824 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
@@ -24,7 +24,6 @@ import java.util.Collection;
import java.util.EnumSet;
import java.util.Set;
import org.apache.beam.runners.direct.CommittedResult.OutputType;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
import org.apache.beam.sdk.metrics.MetricUpdates;
import org.apache.beam.sdk.transforms.AppliedPTransform;
@@ -36,7 +35,7 @@ import org.joda.time.Instant;
* An immutable {@link TransformResult}.
*/
@AutoValue
-public abstract class StepTransformResult<InputT> implements TransformResult<InputT> {
+abstract class StepTransformResult<InputT> implements TransformResult<InputT> {
public static <InputT> Builder<InputT> withHold(
AppliedPTransform<?, ?, ?> transform, Instant watermarkHold) {
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
index cba754e..b5486c0 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
@@ -29,8 +29,6 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.construction.ReplacementOutputs;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.testing.TestStream.ElementEvent;
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java
index 79c942b..1a7209d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.runners.direct;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.util.WindowedValue;
/**
@@ -26,7 +25,7 @@ import org.apache.beam.sdk.util.WindowedValue;
*
* @param <InputT> the type of elements that will be passed to {@link #processElement}
*/
-public interface TransformEvaluator<InputT> {
+interface TransformEvaluator<InputT> {
/**
* Process an element in the input {@link CommittedBundle}.
*
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
index c7bc46f..c187359 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
@@ -30,7 +30,7 @@ import org.apache.beam.sdk.transforms.PTransform;
* <p>{@link TransformEvaluatorFactory TransformEvaluatorFactories} will be reused within a single
* execution of a {@link Pipeline} but will not be reused across executions.
*/
-public interface TransformEvaluatorFactory {
+interface TransformEvaluatorFactory {
/**
* Create a new {@link TransformEvaluator} for the application of the {@link PTransform}.
*
@@ -47,13 +47,13 @@ public interface TransformEvaluatorFactory {
*/
@Nullable
<InputT> TransformEvaluator<InputT> forApplication(
- AppliedPTransform<?, ?, ?> application, DirectRunner.CommittedBundle<?> inputBundle)
+ AppliedPTransform<?, ?, ?> application, CommittedBundle<?> inputBundle)
throws Exception;
/**
* Cleans up any state maintained by this {@link TransformEvaluatorFactory}. Called after a
* {@link Pipeline} is shut down. No more calls to
- * {@link #forApplication(AppliedPTransform, DirectRunner.CommittedBundle)} will be made after
+ * {@link #forApplication(AppliedPTransform, CommittedBundle)} will be made after
* a call to {@link #cleanup()}.
*/
void cleanup() throws Exception;
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
index d06c460..a00253a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
@@ -28,7 +28,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.runners.core.SplittableParDo;
import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow;
import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.runners.direct.ParDoMultiOverrideFactory.StatefulParDo;
import org.apache.beam.runners.direct.ViewOverrideFactory.WriteView;
import org.apache.beam.sdk.io.Read;
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
index bbc0aae..26c4f5c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
@@ -21,7 +21,6 @@ import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.Callable;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.metrics.MetricUpdates;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
index 3a95df7..0b0790e 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.direct;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.runners.direct.CommittedResult.OutputType;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
import org.apache.beam.sdk.metrics.MetricUpdates;
import org.apache.beam.sdk.transforms.AppliedPTransform;
@@ -36,7 +35,7 @@ import org.joda.time.Instant;
* <p>Every transform evaluator has a defined input type, but {@link ParDo} has multiple outputs
* so there is not necesssarily a defined output type.
*/
-public interface TransformResult<InputT> {
+interface TransformResult<InputT> {
/**
* Returns the {@link AppliedPTransform} that produced this result.
*
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
index d3609f8..922a681 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
@@ -29,8 +29,6 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.runners.direct.UnboundedReadDeduplicator.NeverDeduplicator;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.Read.Unbounded;
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UncommittedBundle.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UncommittedBundle.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UncommittedBundle.java
new file mode 100644
index 0000000..07fa138
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UncommittedBundle.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct;
+
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Instant;
+
+/**
+ * Part of a {@link PCollection}. Elements are output to a bundle, which will cause them to be
+ * executed by {@link PTransform PTransforms} that consume the {@link PCollection} this bundle is
+ * a part of at a later point. This is an uncommitted bundle and can have elements added to it.
+ *
+ * @param <T> the type of elements that can be added to this bundle
+ */
+interface UncommittedBundle<T> {
+ /**
+ * Returns the PCollection that the elements of this {@link UncommittedBundle} belong to.
+ */
+ @Nullable
+ PCollection<T> getPCollection();
+
+ /**
+ * Outputs an element to this bundle.
+ *
+ * @param element the element to add to this bundle
+ * @return this bundle
+ */
+ UncommittedBundle<T> add(WindowedValue<T> element);
+
+ /**
+ * Commits this {@link UncommittedBundle}, returning an immutable {@link CommittedBundle}
+ * containing all of the elements that were added to it. The {@link #add(WindowedValue)} method
+ * will throw an {@link IllegalStateException} if called after a call to commit.
+ * @param synchronizedProcessingTime the synchronized processing time at which this bundle was
+ * committed
+ */
+ CommittedBundle<T> commit(Instant synchronizedProcessingTime);
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
index 8cbe8fc..f4648e9 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
@@ -21,7 +21,6 @@ import com.google.common.collect.Iterables;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.runners.direct.CommittedResult.OutputType;
-import org.apache.beam.runners.direct.DirectRunner.PCollectionViewWriter;
import org.apache.beam.runners.direct.StepTransformResult.Builder;
import org.apache.beam.runners.direct.ViewOverrideFactory.WriteView;
import org.apache.beam.sdk.transforms.AppliedPTransform;
@@ -50,7 +49,7 @@ class ViewEvaluatorFactory implements TransformEvaluatorFactory {
@Override
public <T> TransformEvaluator<T> forApplication(
AppliedPTransform<?, ?, ?> application,
- DirectRunner.CommittedBundle<?> inputBundle) {
+ CommittedBundle<?> inputBundle) {
@SuppressWarnings({"cast", "unchecked", "rawtypes"})
TransformEvaluator<T> evaluator = createEvaluator(
(AppliedPTransform) application);
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index 8c04362..b576e00 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -54,7 +54,6 @@ import javax.annotation.concurrent.GuardedBy;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.TimerInternals.TimerData;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
@@ -127,7 +126,7 @@ import org.joda.time.Instant;
* Watermark_PCollection = Watermark_Out_ProducingPTransform
* </pre>
*/
-public class WatermarkManager {
+class WatermarkManager {
// The number of updates to apply in #tryApplyPendingUpdates
private static final int MAX_INCREMENTAL_UPDATES = 10;
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
index 2550924..30d507b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
@@ -20,8 +20,6 @@ package org.apache.beam.runners.direct;
import com.google.common.collect.Iterables;
import java.util.Collection;
import javax.annotation.Nullable;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
index 2b5b46d..df7c18e 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
@@ -39,8 +39,6 @@ import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.CountDownLatch;
import org.apache.beam.runners.direct.BoundedReadEvaluatorFactory.BoundedSourceShard;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
index c6054b6..7d037d1 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
@@ -31,8 +31,6 @@ import com.google.common.collect.Iterables;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.KvCoder;
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
index 68d6eba..077cd43 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
@@ -73,7 +73,7 @@ public class CommittedResultTest implements Serializable {
CommittedResult.create(
StepTransformResult.withoutHold(transform).build(),
bundleFactory.createBundle(created).commit(Instant.now()),
- Collections.<DirectRunner.CommittedBundle<?>>emptyList(),
+ Collections.<CommittedBundle<?>>emptyList(),
EnumSet.noneOf(OutputType.class));
assertThat(result.getTransform(), Matchers.<AppliedPTransform<?, ?, ?>>equalTo(transform));
@@ -81,7 +81,7 @@ public class CommittedResultTest implements Serializable {
@Test
public void getUncommittedElementsEqualInput() {
- DirectRunner.CommittedBundle<Integer> bundle =
+ CommittedBundle<Integer> bundle =
bundleFactory.createBundle(created)
.add(WindowedValue.valueInGlobalWindow(2))
.commit(Instant.now());
@@ -89,11 +89,11 @@ public class CommittedResultTest implements Serializable {
CommittedResult.create(
StepTransformResult.withoutHold(transform).build(),
bundle,
- Collections.<DirectRunner.CommittedBundle<?>>emptyList(),
+ Collections.<CommittedBundle<?>>emptyList(),
EnumSet.noneOf(OutputType.class));
assertThat(result.getUnprocessedInputs(),
- Matchers.<DirectRunner.CommittedBundle<?>>equalTo(bundle));
+ Matchers.<CommittedBundle<?>>equalTo(bundle));
}
@Test
@@ -102,7 +102,7 @@ public class CommittedResultTest implements Serializable {
CommittedResult.create(
StepTransformResult.withoutHold(transform).build(),
null,
- Collections.<DirectRunner.CommittedBundle<?>>emptyList(),
+ Collections.<CommittedBundle<?>>emptyList(),
EnumSet.noneOf(OutputType.class));
assertThat(result.getUnprocessedInputs(), nullValue());
@@ -110,7 +110,7 @@ public class CommittedResultTest implements Serializable {
@Test
public void getOutputsEqualInput() {
- List<? extends DirectRunner.CommittedBundle<?>> outputs =
+ List<? extends CommittedBundle<?>> outputs =
ImmutableList.of(bundleFactory.createBundle(PCollection.createPrimitiveOutputInternal(p,
WindowingStrategy.globalDefault(),
PCollection.IsBounded.BOUNDED)).commit(Instant.now()),
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
index ee51e9a..d5d0aff 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
@@ -26,7 +26,6 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertThat;
import com.google.common.collect.ImmutableList;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.metrics.DistributionData;
import org.apache.beam.sdk.metrics.DistributionResult;
import org.apache.beam.sdk.metrics.GaugeData;
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index bfbcd79..40582d9 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -36,9 +36,6 @@ import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.runners.direct.DirectRunner.PCollectionViewWriter;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.runners.direct.WatermarkManager.FiredTimers;
import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
import org.apache.beam.sdk.coders.ByteArrayCoder;
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
index e07c9f9..7dc01e6 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
@@ -24,8 +24,6 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.google.common.collect.Iterables;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.AppliedPTransform;
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
index fefafd0..6dcd5e2 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
@@ -28,8 +28,6 @@ import com.google.common.collect.Multiset;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.KeyedWorkItems;
import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java
index 94514ad..1373219 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java
@@ -28,8 +28,6 @@ import com.google.common.collect.Multiset;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.KeyedWorkItems;
import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
index 838e0bd..95c0ad1 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
@@ -20,8 +20,6 @@ package org.apache.beam.runners.direct;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertThat;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.testing.TestPipeline;
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
index cd3e9b4..1cd5786 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
@@ -19,7 +19,6 @@ package org.apache.beam.runners.direct;
import java.io.Serializable;
import java.util.Collections;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Count;
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java
index c5ad0cd..4a392db 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java
@@ -26,8 +26,6 @@ import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
index 69dbc22..ef8add9 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
@@ -31,7 +31,6 @@ import java.util.Collection;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.AppliedPTransform;