You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2019/09/05 00:45:00 UTC

[jira] [Work logged] (BEAM-7600) Spark portable runner: reuse SDK harness

     [ https://issues.apache.org/jira/browse/BEAM-7600?focusedWorklogId=306799&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-306799 ]

ASF GitHub Bot logged work on BEAM-7600:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 05/Sep/19 00:44
            Start Date: 05/Sep/19 00:44
    Worklog Time Spent: 10m 
      Work Description: angoenka commented on pull request #9095: [BEAM-7600] borrow SDK harness management code into Spark runner
URL: https://github.com/apache/beam/pull/9095#discussion_r321029943
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ReferenceCountingExecutableStageContextFactory.java
 ##########
 @@ -26,42 +26,46 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
 import org.apache.beam.runners.core.construction.graph.ExecutableStage;
-import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
 import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
 import org.apache.beam.sdk.function.ThrowingFunction;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PortablePipelineOptions;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.java.ExecutionEnvironment;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * {@link FlinkExecutableStageContext.Factory} which counts FlinkExecutableStageContext reference
- * for book keeping.
+ * {@link ExecutableStageContext.Factory} which counts ExecutableStageContext reference for book
+ * keeping.
  */
-public class ReferenceCountingFlinkExecutableStageContextFactory
-    implements FlinkExecutableStageContext.Factory {
+public class ReferenceCountingExecutableStageContextFactory
+    implements ExecutableStageContext.Factory {
   private static final Logger LOG =
-      LoggerFactory.getLogger(ReferenceCountingFlinkExecutableStageContextFactory.class);
+      LoggerFactory.getLogger(ReferenceCountingExecutableStageContextFactory.class);
   private static final int MAX_RETRY = 3;
 
   private final Creator creator;
   private transient volatile ScheduledExecutorService executor;
   private transient volatile ConcurrentHashMap<String, WrappedContext> keyRegistry;
+  private final SerializableFunction<Object, Boolean> isReleaseSynchronous;
 
-  public static ReferenceCountingFlinkExecutableStageContextFactory create(Creator creator) {
-    return new ReferenceCountingFlinkExecutableStageContextFactory(creator);
+  public static ReferenceCountingExecutableStageContextFactory create(
+      Creator creator, SerializableFunction<Object, Boolean> isReleaseSynchronous) {
+    return new ReferenceCountingExecutableStageContextFactory(creator, isReleaseSynchronous);
   }
 
-  private ReferenceCountingFlinkExecutableStageContextFactory(Creator creator) {
+  private ReferenceCountingExecutableStageContextFactory(
+      Creator creator, SerializableFunction<Object, Boolean> isReleaseSynchronous) {
     this.creator = creator;
+    this.isReleaseSynchronous = isReleaseSynchronous;
   }
 
   @Override
-  public FlinkExecutableStageContext get(JobInfo jobInfo) {
+  public ExecutableStageContext get(
+      JobInfo jobInfo, SerializableFunction<Object, Boolean> isReleaseSynchronous) {
 
 Review comment:
   `isReleaseSynchronous` is not used
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 306799)
    Time Spent: 6h 10m  (was: 6h)

> Spark portable runner: reuse SDK harness
> ----------------------------------------
>
>                 Key: BEAM-7600
>                 URL: https://issues.apache.org/jira/browse/BEAM-7600
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-spark
>            Reporter: Kyle Weaver
>            Assignee: Kyle Weaver
>            Priority: Major
>              Labels: portability-spark
>          Time Spent: 6h 10m
>  Remaining Estimate: 0h
>
> Right now, we're creating a new SDK harness every time an executable stage is run [1], which is expensive. We should be able to re-use code from the Flink runner to re-use the SDK harness [2].
>  
> [1] [https://github.com/apache/beam/blob/c9fb261bc7666788402840bb6ce1b0ce2fd445d1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java#L135]
> [2] [https://github.com/apache/beam/blob/c9fb261bc7666788402840bb6ce1b0ce2fd445d1/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContext.java]



--
This message was sent by Atlassian Jira
(v8.3.2#803003)