You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by sc...@apache.org on 2019/01/18 21:54:19 UTC

[beam] branch master updated: Minor code refactoring and improvements.

This is an automated email from the ASF dual-hosted git repository.

scott pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 9214fa9  Minor code refactoring and improvements.
     new d4bd8bb  Merge pull request #7557: [BEAM-2928]Minor code refactoring and improvements.
9214fa9 is described below

commit 9214fa9422800df92689ce784bc81ce062fb3ede
Author: Ruoyun <hu...@gmail.com>
AuthorDate: Wed Jan 2 12:48:02 2019 -0800

    Minor code refactoring and improvements.
---
 .../runners/core/construction/graph/SideInputReference.java    |  9 +++++++++
 .../org/apache/beam/runners/direct/CloningBundleFactory.java   |  9 +++++++++
 .../apache/beam/runners/direct/ImmutableListBundleFactory.java | 10 ++++++++++
 .../org/apache/beam/runners/direct/PCollectionViewWriter.java  |  1 +
 .../direct/portable/ExecutorServiceParallelExecutor.java       | 10 +++++-----
 .../beam/runners/direct/portable/PCollectionViewWriter.java    |  1 +
 .../org/apache/beam/runners/direct/portable/PortableGraph.java |  4 ++++
 .../apache/beam/runners/direct/portable/ReferenceRunner.java   |  6 ++----
 8 files changed, 41 insertions(+), 9 deletions(-)

diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/SideInputReference.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/SideInputReference.java
index 29e2746..e329743 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/SideInputReference.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/SideInputReference.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.core.construction.graph;
 
 import com.google.auto.value.AutoValue;
+import com.google.common.base.MoreObjects;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
@@ -58,4 +59,12 @@ public abstract class SideInputReference {
   public abstract String localName();
   /** The PCollection that backs this side input. */
   public abstract PCollectionNode collection();
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this)
+        .add("Transform", transform().toString())
+        .add("PCollection", collection().toString())
+        .toString();
+  }
 }
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CloningBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CloningBundleFactory.java
index 9c35ef2..25571da 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CloningBundleFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CloningBundleFactory.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.direct;
 
+import com.google.common.base.MoreObjects;
 import org.apache.beam.runners.local.StructuralKey;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
@@ -92,5 +93,13 @@ class CloningBundleFactory implements BundleFactory {
     public CommittedBundle<T> commit(Instant synchronizedProcessingTime) {
       return underlying.commit(synchronizedProcessingTime);
     }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(this)
+          .add("Data", underlying.toString())
+          .add("Coder", coder.toString())
+          .toString();
+    }
   }
 }
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
index f84043a..58484e1 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
@@ -21,6 +21,7 @@ import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Precondi
 import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
 
 import com.google.auto.value.AutoValue;
+import com.google.common.base.MoreObjects;
 import java.util.Iterator;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -111,6 +112,15 @@ class ImmutableListBundleFactory implements BundleFactory {
       return CommittedImmutableListBundle.create(
           pcollection, key, committedElements, minSoFar, synchronizedCompletionTime);
     }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(this)
+          .add("Key", key.toString())
+          .add("PCollection", pcollection)
+          .add("Elements", elements.build())
+          .toString();
+    }
   }
 
   @AutoValue
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PCollectionViewWriter.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PCollectionViewWriter.java
index 6b8577b..f3828d8 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PCollectionViewWriter.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PCollectionViewWriter.java
@@ -28,6 +28,7 @@ import org.apache.beam.sdk.values.PCollectionView;
  * @param <ElemT> the type of elements the input {@link PCollection} contains.
  * @param <ViewT> the type of the PCollectionView this writer writes to.
  */
+@FunctionalInterface
 interface PCollectionViewWriter<ElemT, ViewT> {
   void add(Iterable<WindowedValue<ElemT>> values);
 }
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ExecutorServiceParallelExecutor.java
index e444210..1a7a99a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ExecutorServiceParallelExecutor.java
@@ -63,7 +63,7 @@ final class ExecutorServiceParallelExecutor
   private final ExecutorService executorService;
 
   private final RootProviderRegistry rootRegistry;
-  private final TransformEvaluatorRegistry registry;
+  private final TransformEvaluatorRegistry transformRegistry;
 
   private final ExecutableGraph<PTransformNode, PCollectionNode> graph;
   private final EvaluationContext evaluationContext;
@@ -89,7 +89,7 @@ final class ExecutorServiceParallelExecutor
   private ExecutorServiceParallelExecutor(
       int targetParallelism,
       RootProviderRegistry rootRegistry,
-      TransformEvaluatorRegistry registry,
+      TransformEvaluatorRegistry transformRegistry,
       ExecutableGraph<PTransformNode, PCollectionNode> graph,
       EvaluationContext context) {
     this.targetParallelism = targetParallelism;
@@ -103,7 +103,7 @@ final class ExecutorServiceParallelExecutor
                 .setNameFormat("direct-runner-worker")
                 .build());
     this.rootRegistry = rootRegistry;
-    this.registry = registry;
+    this.transformRegistry = transformRegistry;
     this.graph = graph;
     this.evaluationContext = context;
 
@@ -119,7 +119,7 @@ final class ExecutorServiceParallelExecutor
     this.visibleUpdates = new QueueMessageReceiver();
 
     parallelExecutorService = TransformExecutorServices.parallel(executorService);
-    executorFactory = new DirectTransformExecutor.Factory(context, registry);
+    executorFactory = new DirectTransformExecutor.Factory(context, transformRegistry);
   }
 
   private CacheLoader<StepAndKey, TransformExecutorService>
@@ -306,7 +306,7 @@ final class ExecutorServiceParallelExecutor
       errors.add(re);
     }
     try {
-      registry.cleanup();
+      transformRegistry.cleanup();
     } catch (final Exception e) {
       errors.add(e);
     }
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/PCollectionViewWriter.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/PCollectionViewWriter.java
index 895c533..944caca 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/PCollectionViewWriter.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/PCollectionViewWriter.java
@@ -28,6 +28,7 @@ import org.apache.beam.sdk.values.PCollectionView;
  * @param <ElemT> the type of elements the input {@link PCollection} contains.
  * @param <ViewT> the type of the PCollectionView this writer writes to.
  */
+@FunctionalInterface
 interface PCollectionViewWriter<ElemT, ViewT> {
   void add(Iterable<WindowedValue<ElemT>> values);
 }
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/PortableGraph.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/PortableGraph.java
index ef6edcb..88b6e8c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/PortableGraph.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/PortableGraph.java
@@ -66,4 +66,8 @@ class PortableGraph implements ExecutableGraph<PTransformNode, PCollectionNode>
   public Collection<PTransformNode> getPerElementConsumers(PCollectionNode pCollection) {
     return queryablePipeline.getPerElementConsumers(pCollection);
   }
+
+  public QueryablePipeline getQueryablePipeline() {
+    return this.queryablePipeline;
+  }
 }
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java
index 2c2c9f0..49445f5 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java
@@ -279,10 +279,8 @@ public class ReferenceRunner {
                 .getWindowingStrategiesOrThrow(input.getWindowingStrategyId())
                 .getWindowCoderId();
         // This coder isn't actually required for the pipeline to function properly - the KWIs can
-        // be
-        // passed around as pure java objects with no coding of the values, but it approximates a
-        // full
-        // pipeline.
+        // be passed around as pure java objects with no coding of the values, but it approximates
+        // a full pipeline.
         Coder kwiCoder =
             Coder.newBuilder()
                 .setSpec(