You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by sc...@apache.org on 2019/01/18 21:54:19 UTC
[beam] branch master updated: Minor code refactoring and
improvements.
This is an automated email from the ASF dual-hosted git repository.
scott pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 9214fa9 Minor code refactoring and improvements.
new d4bd8bb Merge pull request #7557: [BEAM-2928]Minor code refactoring and improvements.
9214fa9 is described below
commit 9214fa9422800df92689ce784bc81ce062fb3ede
Author: Ruoyun <hu...@gmail.com>
AuthorDate: Wed Jan 2 12:48:02 2019 -0800
Minor code refactoring and improvements.
---
.../runners/core/construction/graph/SideInputReference.java | 9 +++++++++
.../org/apache/beam/runners/direct/CloningBundleFactory.java | 9 +++++++++
.../apache/beam/runners/direct/ImmutableListBundleFactory.java | 10 ++++++++++
.../org/apache/beam/runners/direct/PCollectionViewWriter.java | 1 +
.../direct/portable/ExecutorServiceParallelExecutor.java | 10 +++++-----
.../beam/runners/direct/portable/PCollectionViewWriter.java | 1 +
.../org/apache/beam/runners/direct/portable/PortableGraph.java | 4 ++++
.../apache/beam/runners/direct/portable/ReferenceRunner.java | 6 ++----
8 files changed, 41 insertions(+), 9 deletions(-)
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/SideInputReference.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/SideInputReference.java
index 29e2746..e329743 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/SideInputReference.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/SideInputReference.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.core.construction.graph;
import com.google.auto.value.AutoValue;
+import com.google.common.base.MoreObjects;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId;
import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
@@ -58,4 +59,12 @@ public abstract class SideInputReference {
public abstract String localName();
/** The PCollection that backs this side input. */
public abstract PCollectionNode collection();
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("Transform", transform().toString())
+ .add("PCollection", collection().toString())
+ .toString();
+ }
}
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CloningBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CloningBundleFactory.java
index 9c35ef2..25571da 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CloningBundleFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CloningBundleFactory.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.direct;
+import com.google.common.base.MoreObjects;
import org.apache.beam.runners.local.StructuralKey;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
@@ -92,5 +93,13 @@ class CloningBundleFactory implements BundleFactory {
public CommittedBundle<T> commit(Instant synchronizedProcessingTime) {
return underlying.commit(synchronizedProcessingTime);
}
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("Data", underlying.toString())
+ .add("Coder", coder.toString())
+ .toString();
+ }
}
}
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
index f84043a..58484e1 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
@@ -21,6 +21,7 @@ import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Precondi
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
import com.google.auto.value.AutoValue;
+import com.google.common.base.MoreObjects;
import java.util.Iterator;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -111,6 +112,15 @@ class ImmutableListBundleFactory implements BundleFactory {
return CommittedImmutableListBundle.create(
pcollection, key, committedElements, minSoFar, synchronizedCompletionTime);
}
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("Key", key.toString())
+ .add("PCollection", pcollection)
+ .add("Elements", elements.build())
+ .toString();
+ }
}
@AutoValue
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PCollectionViewWriter.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PCollectionViewWriter.java
index 6b8577b..f3828d8 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PCollectionViewWriter.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PCollectionViewWriter.java
@@ -28,6 +28,7 @@ import org.apache.beam.sdk.values.PCollectionView;
* @param <ElemT> the type of elements the input {@link PCollection} contains.
* @param <ViewT> the type of the PCollectionView this writer writes to.
*/
+@FunctionalInterface
interface PCollectionViewWriter<ElemT, ViewT> {
void add(Iterable<WindowedValue<ElemT>> values);
}
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ExecutorServiceParallelExecutor.java
index e444210..1a7a99a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ExecutorServiceParallelExecutor.java
@@ -63,7 +63,7 @@ final class ExecutorServiceParallelExecutor
private final ExecutorService executorService;
private final RootProviderRegistry rootRegistry;
- private final TransformEvaluatorRegistry registry;
+ private final TransformEvaluatorRegistry transformRegistry;
private final ExecutableGraph<PTransformNode, PCollectionNode> graph;
private final EvaluationContext evaluationContext;
@@ -89,7 +89,7 @@ final class ExecutorServiceParallelExecutor
private ExecutorServiceParallelExecutor(
int targetParallelism,
RootProviderRegistry rootRegistry,
- TransformEvaluatorRegistry registry,
+ TransformEvaluatorRegistry transformRegistry,
ExecutableGraph<PTransformNode, PCollectionNode> graph,
EvaluationContext context) {
this.targetParallelism = targetParallelism;
@@ -103,7 +103,7 @@ final class ExecutorServiceParallelExecutor
.setNameFormat("direct-runner-worker")
.build());
this.rootRegistry = rootRegistry;
- this.registry = registry;
+ this.transformRegistry = transformRegistry;
this.graph = graph;
this.evaluationContext = context;
@@ -119,7 +119,7 @@ final class ExecutorServiceParallelExecutor
this.visibleUpdates = new QueueMessageReceiver();
parallelExecutorService = TransformExecutorServices.parallel(executorService);
- executorFactory = new DirectTransformExecutor.Factory(context, registry);
+ executorFactory = new DirectTransformExecutor.Factory(context, transformRegistry);
}
private CacheLoader<StepAndKey, TransformExecutorService>
@@ -306,7 +306,7 @@ final class ExecutorServiceParallelExecutor
errors.add(re);
}
try {
- registry.cleanup();
+ transformRegistry.cleanup();
} catch (final Exception e) {
errors.add(e);
}
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/PCollectionViewWriter.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/PCollectionViewWriter.java
index 895c533..944caca 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/PCollectionViewWriter.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/PCollectionViewWriter.java
@@ -28,6 +28,7 @@ import org.apache.beam.sdk.values.PCollectionView;
* @param <ElemT> the type of elements the input {@link PCollection} contains.
* @param <ViewT> the type of the PCollectionView this writer writes to.
*/
+@FunctionalInterface
interface PCollectionViewWriter<ElemT, ViewT> {
void add(Iterable<WindowedValue<ElemT>> values);
}
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/PortableGraph.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/PortableGraph.java
index ef6edcb..88b6e8c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/PortableGraph.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/PortableGraph.java
@@ -66,4 +66,8 @@ class PortableGraph implements ExecutableGraph<PTransformNode, PCollectionNode>
public Collection<PTransformNode> getPerElementConsumers(PCollectionNode pCollection) {
return queryablePipeline.getPerElementConsumers(pCollection);
}
+
+ public QueryablePipeline getQueryablePipeline() {
+ return this.queryablePipeline;
+ }
}
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java
index 2c2c9f0..49445f5 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java
@@ -279,10 +279,8 @@ public class ReferenceRunner {
.getWindowingStrategiesOrThrow(input.getWindowingStrategyId())
.getWindowCoderId();
// This coder isn't actually required for the pipeline to function properly - the KWIs can
- // be
- // passed around as pure java objects with no coding of the values, but it approximates a
- // full
- // pipeline.
+ // be passed around as pure java objects with no coding of the values, but it approximates
+ // a full pipeline.
Coder kwiCoder =
Coder.newBuilder()
.setSpec(