You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/05/20 13:17:18 UTC

[GitHub] [beam] RyanSkraba commented on a change in pull request #10888: [BEAM-7304] Twister2 Beam runner

RyanSkraba commented on a change in pull request #10888:
URL: https://github.com/apache/beam/pull/10888#discussion_r427878327



##########
File path: runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2PipelineOptions.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.twister2;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import edu.iu.dsc.tws.tset.env.TSetEnvironment;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.StreamingOptions;
+
+/** Twister2PipelineOptions. */
+public interface Twister2PipelineOptions extends PipelineOptions, StreamingOptions {
+  @Description("set unique application name for Twister2 runner")

Review comment:
       OK, as far as I remember, all of the PipelineOption annotations are supposed to go on the `getXxx` or `isXxx` methods, not on the `setXxx` methods... 
   
   I thought this was a rule and it's true for all runners but Gearpump and Apex... I would follow the example of Spark/Flink!
   
   Also, for readability: for options, use `getXxx` before `setXxx` consistently (or vice versa!)

##########
File path: runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2PipelineOptions.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.twister2;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import edu.iu.dsc.tws.tset.env.TSetEnvironment;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.StreamingOptions;
+
+/** Twister2PipelineOptions. */
+public interface Twister2PipelineOptions extends PipelineOptions, StreamingOptions {
+  @Description("set unique application name for Twister2 runner")
+  void setApplicationName(String name);
+
+  String getApplicationName();

Review comment:
       Some of these options seem unused by the runner (unless I'm mistaken...) : applicationName, serializers
   
   Are they meant to be here?

##########
File path: runners/twister2/build.gradle
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import groovy.json.JsonOutput
+
+plugins { id 'org.apache.beam.module' }
+
+applyJavaNature(automaticModuleName: 'org.apache.beam.runners.twister2')
+evaluationDependsOn(":sdks:java:core")
+configurations {
+    validatesRunner
+}
+description = "Apache Beam :: Runners :: Twister2"
+
+repositories {

Review comment:
       You can probably remove this section! 

##########
File path: runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2LegacyRunner.java
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.twister2;
+
+import static org.apache.beam.runners.core.construction.resources.PipelineResources.detectClassPathResourcesToStage;
+
+import edu.iu.dsc.tws.api.JobConfig;
+import edu.iu.dsc.tws.api.Twister2Job;
+import edu.iu.dsc.tws.api.config.Config;
+import edu.iu.dsc.tws.api.driver.DriverJobState;
+import edu.iu.dsc.tws.api.exceptions.Twister2RuntimeException;
+import edu.iu.dsc.tws.api.scheduler.Twister2JobState;
+import edu.iu.dsc.tws.api.tset.sets.TSet;
+import edu.iu.dsc.tws.api.tset.sets.batch.BatchTSet;
+import edu.iu.dsc.tws.local.LocalSubmitter;
+import edu.iu.dsc.tws.rsched.core.ResourceAllocator;
+import edu.iu.dsc.tws.rsched.job.Twister2Submitter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.logging.LogManager;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+import org.apache.beam.runners.core.construction.PTransformMatchers;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.SplittableParDo;
+import org.apache.beam.runners.core.construction.SplittableParDoNaiveBounded;
+import org.apache.beam.runners.core.construction.resources.PipelineResources;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.runners.PTransformOverride;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+
+/**
+ * A {@link PipelineRunner} that executes the operations in the pipeline by first translating them
+ * to a Twister2 Plan and then executing them either locally or on a Twister2 cluster, depending on
+ * the configuration.
+ */
+public class Twister2LegacyRunner extends PipelineRunner<PipelineResult> {
+
+  private static final Logger LOG = Logger.getLogger(Twister2LegacyRunner.class.getName());
+  private static final String SIDEINPUTS = "sideInputs";
+  private static final String LEAVES = "leaves";
+  private static final String GRAPH = "graph";
+  /** Provided options. */
+  private final Twister2PipelineOptions options;
+
+  public Twister2LegacyRunner(Twister2PipelineOptions options) {
+    this.options = options;
+  }
+
+  public static Twister2LegacyRunner fromOptions(PipelineOptions options) {
+    Twister2PipelineOptions pipelineOptions =
+        PipelineOptionsValidator.validate(Twister2PipelineOptions.class, options);
+    if (pipelineOptions.getFilesToStage() == null) {
+      pipelineOptions.setFilesToStage(
+          detectClassPathResourcesToStage(
+              Twister2LegacyRunner.class.getClassLoader(), pipelineOptions));
+      LOG.info(
+          "PipelineOptions.filesToStage was not specified. "
+              + "Defaulting to files from the classpath: will stage {} files. "
+              + "Enable logging at DEBUG level to see which files will be staged"
+              + pipelineOptions.getFilesToStage().size());
+    }
+    return new Twister2LegacyRunner(pipelineOptions);
+  }
+
+  @Override
+  public PipelineResult run(Pipeline pipeline) {
+    // create a worker and pass in the pipeline and then do the translation
+    Twister2PipelineExecutionEnvironment env = new Twister2PipelineExecutionEnvironment(options);
+    LOG.info("Translating pipeline to Twister2 program.");
+    pipeline.replaceAll(getDefaultOverrides());
+    env.translate(pipeline);
+    setupSystem(options);
+
+    Config config = ResourceAllocator.loadConfig(new HashMap<>());
+
+    JobConfig jobConfig = new JobConfig();
+    jobConfig.put(SIDEINPUTS, extractNames(env.getSideInputs()));
+    jobConfig.put(LEAVES, extractNames(env.getLeaves()));
+    jobConfig.put(GRAPH, env.getTSetGraph());
+
+    int workers = options.getParallelism();
+    Twister2Job twister2Job =
+        Twister2Job.newBuilder()
+            .setJobName(options.getJobName())
+            .setWorkerClass(BeamBatchWorker.class)
+            .addComputeResource(options.getWorkerCPUs(), options.getRamMegaBytes(), workers)
+            .setConfig(jobConfig)
+            .build();
+    Twister2JobState jobState = Twister2Submitter.submitJob(twister2Job, config);
+
+    Twister2PipelineResult result = new Twister2PipelineResult();
+    if (jobState.getJobstate() == DriverJobState.FAILED
+        || jobState.getJobstate() == DriverJobState.RUNNING) {
+      throw new RuntimeException("Pipeline execution failed", jobState.getCause());

Review comment:
       Sorry -- is this correct?  Reading this, it looks like if the Twister2 job is RUNNING, the pipeline fails!  Likewise with the test code below.

##########
File path: runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2LegacyRunner.java
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.twister2;
+
+import static org.apache.beam.runners.core.construction.resources.PipelineResources.detectClassPathResourcesToStage;
+
+import edu.iu.dsc.tws.api.JobConfig;
+import edu.iu.dsc.tws.api.Twister2Job;
+import edu.iu.dsc.tws.api.config.Config;
+import edu.iu.dsc.tws.api.driver.DriverJobState;
+import edu.iu.dsc.tws.api.exceptions.Twister2RuntimeException;
+import edu.iu.dsc.tws.api.scheduler.Twister2JobState;
+import edu.iu.dsc.tws.api.tset.sets.TSet;
+import edu.iu.dsc.tws.api.tset.sets.batch.BatchTSet;
+import edu.iu.dsc.tws.local.LocalSubmitter;
+import edu.iu.dsc.tws.rsched.core.ResourceAllocator;
+import edu.iu.dsc.tws.rsched.job.Twister2Submitter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.logging.LogManager;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+import org.apache.beam.runners.core.construction.PTransformMatchers;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.SplittableParDo;
+import org.apache.beam.runners.core.construction.SplittableParDoNaiveBounded;
+import org.apache.beam.runners.core.construction.resources.PipelineResources;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.runners.PTransformOverride;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+
+/**
+ * A {@link PipelineRunner} that executes the operations in the pipeline by first translating them
+ * to a Twister2 Plan and then executing them either locally or on a Twister2 cluster, depending on
+ * the configuration.
+ */
+public class Twister2LegacyRunner extends PipelineRunner<PipelineResult> {
+
+  private static final Logger LOG = Logger.getLogger(Twister2LegacyRunner.class.getName());
+  private static final String SIDEINPUTS = "sideInputs";
+  private static final String LEAVES = "leaves";
+  private static final String GRAPH = "graph";
+  /** Provided options. */
+  private final Twister2PipelineOptions options;
+
+  public Twister2LegacyRunner(Twister2PipelineOptions options) {

Review comment:
       Consider making this protected to ensure that getFilesToStage() is correctly set via `fromOptions`

##########
File path: runners/twister2/src/main/java/org/apache/beam/runners/twister2/BeamBatchWorker.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.twister2;
+
+import edu.iu.dsc.tws.api.config.Config;
+import edu.iu.dsc.tws.api.tset.TBase;
+import edu.iu.dsc.tws.api.tset.sets.TSet;
+import edu.iu.dsc.tws.api.tset.sets.batch.BatchTSet;
+import edu.iu.dsc.tws.tset.TBaseGraph;
+import edu.iu.dsc.tws.tset.env.BatchTSetEnvironment;
+import edu.iu.dsc.tws.tset.links.BaseTLink;
+import edu.iu.dsc.tws.tset.sets.BaseTSet;
+import edu.iu.dsc.tws.tset.sets.BuildableTSet;
+import edu.iu.dsc.tws.tset.sets.batch.CachedTSet;
+import edu.iu.dsc.tws.tset.sets.batch.ComputeTSet;
+import edu.iu.dsc.tws.tset.sets.batch.SinkTSet;
+import edu.iu.dsc.tws.tset.worker.BatchTSetIWorker;
+import java.io.Serializable;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.runners.twister2.translators.functions.DoFnFunction;
+import org.apache.beam.runners.twister2.translators.functions.Twister2SinkFunction;
+
+/**
+ * The Twister2 worker that will execute the job logic once the job is submitted from the run
+ * method.
+ */
+public class BeamBatchWorker implements Serializable, BatchTSetIWorker {
+
+  private static final String SIDEINPUTS = "sideInputs";
+  private static final String LEAVES = "leaves";
+  private static final String GRAPH = "graph";
+  private HashMap<String, BatchTSet<?>> sideInputDataSets;
+  private Set<TSet> leaves;

Review comment:
       Raw types!  It looks like you're a bit inconsistent with annotating the raw types TSet, TLink and their subclasses with `<?>`!  I'd strongly prefer to see these filled in consistently.

##########
File path: runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2PipelineResult.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.twister2;
+
+import java.io.IOException;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.joda.time.Duration;
+
+/** Represents a Twister2 pipeline execution result. */
+public class Twister2PipelineResult implements PipelineResult {
+
+  PipelineResult.State state = State.RUNNING;
+
+  @Override
+  public State getState() {
+    return state;
+  }
+
+  @Override
+  public State cancel() throws IOException {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override
+  public State waitUntilFinish(Duration duration) {
+    return State.DONE;

Review comment:
       Haha, this seems like it would be a lie most of the time!
   
   For methods that have fake results like this, I'd put a TODO (at the minimum).
   

##########
File path: runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2LegacyRunner.java
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.twister2;
+
+import static org.apache.beam.runners.core.construction.resources.PipelineResources.detectClassPathResourcesToStage;
+
+import edu.iu.dsc.tws.api.JobConfig;
+import edu.iu.dsc.tws.api.Twister2Job;
+import edu.iu.dsc.tws.api.config.Config;
+import edu.iu.dsc.tws.api.driver.DriverJobState;
+import edu.iu.dsc.tws.api.exceptions.Twister2RuntimeException;
+import edu.iu.dsc.tws.api.scheduler.Twister2JobState;
+import edu.iu.dsc.tws.api.tset.sets.TSet;
+import edu.iu.dsc.tws.api.tset.sets.batch.BatchTSet;
+import edu.iu.dsc.tws.local.LocalSubmitter;
+import edu.iu.dsc.tws.rsched.core.ResourceAllocator;
+import edu.iu.dsc.tws.rsched.job.Twister2Submitter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.logging.LogManager;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+import org.apache.beam.runners.core.construction.PTransformMatchers;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.SplittableParDo;
+import org.apache.beam.runners.core.construction.SplittableParDoNaiveBounded;
+import org.apache.beam.runners.core.construction.resources.PipelineResources;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.runners.PTransformOverride;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+
+/**
+ * A {@link PipelineRunner} that executes the operations in the pipeline by first translating them
+ * to a Twister2 Plan and then executing them either locally or on a Twister2 cluster, depending on
+ * the configuration.
+ */
+public class Twister2LegacyRunner extends PipelineRunner<PipelineResult> {
+
+  private static final Logger LOG = Logger.getLogger(Twister2LegacyRunner.class.getName());
+  private static final String SIDEINPUTS = "sideInputs";
+  private static final String LEAVES = "leaves";
+  private static final String GRAPH = "graph";
+  /** Provided options. */
+  private final Twister2PipelineOptions options;
+
+  public Twister2LegacyRunner(Twister2PipelineOptions options) {
+    this.options = options;
+  }
+
+  public static Twister2LegacyRunner fromOptions(PipelineOptions options) {
+    Twister2PipelineOptions pipelineOptions =
+        PipelineOptionsValidator.validate(Twister2PipelineOptions.class, options);
+    if (pipelineOptions.getFilesToStage() == null) {
+      pipelineOptions.setFilesToStage(
+          detectClassPathResourcesToStage(
+              Twister2LegacyRunner.class.getClassLoader(), pipelineOptions));
+      LOG.info(
+          "PipelineOptions.filesToStage was not specified. "
+              + "Defaulting to files from the classpath: will stage {} files. "
+              + "Enable logging at DEBUG level to see which files will be staged"
+              + pipelineOptions.getFilesToStage().size());
+    }
+    return new Twister2LegacyRunner(pipelineOptions);
+  }
+
+  @Override
+  public PipelineResult run(Pipeline pipeline) {
+    // create a worker and pass in the pipeline and then do the translation
+    Twister2PipelineExecutionEnvironment env = new Twister2PipelineExecutionEnvironment(options);
+    LOG.info("Translating pipeline to Twister2 program.");
+    pipeline.replaceAll(getDefaultOverrides());
+    env.translate(pipeline);
+    setupSystem(options);
+
+    Config config = ResourceAllocator.loadConfig(new HashMap<>());
+
+    JobConfig jobConfig = new JobConfig();
+    jobConfig.put(SIDEINPUTS, extractNames(env.getSideInputs()));
+    jobConfig.put(LEAVES, extractNames(env.getLeaves()));
+    jobConfig.put(GRAPH, env.getTSetGraph());
+
+    int workers = options.getParallelism();
+    Twister2Job twister2Job =
+        Twister2Job.newBuilder()
+            .setJobName(options.getJobName())
+            .setWorkerClass(BeamBatchWorker.class)
+            .addComputeResource(options.getWorkerCPUs(), options.getRamMegaBytes(), workers)
+            .setConfig(jobConfig)
+            .build();
+    Twister2JobState jobState = Twister2Submitter.submitJob(twister2Job, config);
+
+    Twister2PipelineResult result = new Twister2PipelineResult();
+    if (jobState.getJobstate() == DriverJobState.FAILED
+        || jobState.getJobstate() == DriverJobState.RUNNING) {
+      throw new RuntimeException("Pipeline execution failed", jobState.getCause());
+    } else {
+      result.setState(PipelineResult.State.DONE);
+    }
+    return result;
+  }
+
+  public PipelineResult runTest(Pipeline pipeline) {
+    // create a worker and pass in the pipeline and then do the translation
+    Twister2PipelineExecutionEnvironment env = new Twister2PipelineExecutionEnvironment(options);
+    LOG.info("Translating pipeline to Twister2 program.");
+    pipeline.replaceAll(getDefaultOverrides());
+    env.translate(pipeline);
+    setupSystemTest(options);
+    Map configMap = new HashMap();
+    configMap.put(SIDEINPUTS, extractNames(env.getSideInputs()));
+    configMap.put(LEAVES, extractNames(env.getLeaves()));
+    configMap.put(GRAPH, env.getTSetGraph());
+    configMap.put("twister2.network.buffer.size", 32000);
+    configMap.put("twister2.network.sendBuffer.count", 1);
+    Config config = ResourceAllocator.loadConfig(configMap);
+
+    JobConfig jobConfig = new JobConfig();
+
+    int workers = options.getParallelism();
+    Twister2Job twister2Job =
+        Twister2Job.newBuilder()
+            .setJobName(options.getJobName())
+            .setWorkerClass(BeamBatchWorker.class)
+            .addComputeResource(options.getWorkerCPUs(), options.getRamMegaBytes(), workers)
+            .setConfig(jobConfig)
+            .build();
+    Twister2JobState jobState = LocalSubmitter.submitJob(twister2Job, config);
+
+    Twister2PipelineResult result = new Twister2PipelineResult();
+    if (jobState.getJobstate() == DriverJobState.FAILED
+        || jobState.getJobstate() == DriverJobState.RUNNING) {
+      throw new RuntimeException("Pipeline execution failed", jobState.getCause());
+    } else {
+      result.setState(PipelineResult.State.DONE);
+    }
+    return result;
+  }
+
+  private void setupSystem(Twister2PipelineOptions options) {
+    prepareFilesToStage(options);
+    zipFilesToStage(options);
+    System.setProperty("cluster_type", options.getClusterType());
+    System.setProperty("twister2_home", options.getTwister2Home());
+    System.setProperty("job_file", options.getJobFileZip());
+    System.setProperty("job_type", options.getJobType());
+    // do a simple config dir validation
+    System.setProperty("config_dir", options.getTwister2Home() + "/conf/");
+    File cDir = new File(System.getProperty("config_dir"), options.getClusterType());
+
+    String[] filesList =
+        new String[] {
+          "core.yaml", "network.yaml", "data.yaml", "resource.yaml", "task.yaml",
+        };
+
+    for (String file : filesList) {
+      File toCheck = new File(cDir, file);
+      if (!toCheck.exists()) {
+        throw new Twister2RuntimeException(
+            "Couldn't find " + file + " in config directory specified.");
+      }
+    }
+
+    // setup logging
+    FileInputStream fis = null;
+    try {
+      fis = new FileInputStream(new File(cDir, "logger.properties"));
+      LogManager.getLogManager().readConfiguration(fis);
+      fis.close();
+    } catch (IOException e) {
+      LOG.warning("Couldn't load logging configuration");
+    } finally {
+      if (fis != null) {
+        try {
+          fis.close();
+        } catch (IOException e) {
+          LOG.info(e.getMessage());
+        }
+      }
+    }
+  }
+
+  private void setupSystemTest(Twister2PipelineOptions options) {
+    prepareFilesToStage(options);
+    zipFilesToStage(options);
+    System.setProperty("job_file", options.getJobFileZip());
+    System.setProperty("job_type", options.getJobType());
+  }
+
+  private Set<String> extractNames(Set<TSet> leaves) {
+    Set<String> results = new HashSet<>();
+    for (TSet leaf : leaves) {
+      results.add(leaf.getId());
+    }
+    return results;
+  }
+
+  private Map<String, String> extractNames(Map<String, BatchTSet<?>> sideInputs) {
+    Map<String, String> results = new LinkedHashMap<>();
+    for (Map.Entry<String, BatchTSet<?>> entry : sideInputs.entrySet()) {
+      results.put(entry.getKey(), entry.getValue().getId());
+    }
+    return results;
+  }
+
+  /**
+   * Classpath contains non jar files (eg. directories with .class files or empty directories) will
+   * cause exception in running log.
+   */
+  private void prepareFilesToStage(Twister2PipelineOptions options) {
+    List<String> filesToStage =
+        options.getFilesToStage().stream()
+            .map(File::new)
+            .filter(File::exists)
+            .map(
+                file -> {
+                  return file.getAbsolutePath();
+                })
+            .collect(Collectors.toList());
+    options.setFilesToStage(
+        PipelineResources.prepareFilesForStaging(
+            filesToStage,
+            MoreObjects.firstNonNull(
+                options.getTempLocation(), System.getProperty("java.io.tmpdir"))));
+  }
+
+  /**
+   * creates a single zip file from all the jar files that are listed as files to stage in options.
+   *
+   * @param options
+   */
+  private void zipFilesToStage(Twister2PipelineOptions options) {

Review comment:
       It looks like this will fill my disk with uber jars as I'm using the runner!  Is it possible to clean them up?  (At least during the validateRunner tests... I had about 180 uuid.zip files of 165MB each, and about half of them were suspiciously 0 bytes:
   
   ```
   ....
   -rw-rw-r--. 1 rskraba rskraba    0 May 20 11:41 /tmp/c16485fb-9cf0-41e8-bdeb-53ba6cf0fc15.zip
   -rw-rw-r--. 1 rskraba rskraba 165M May 20 11:26 /tmp/c4d8612b-0dc0-431a-9b70-0f1537b7cba4.zip
   -rw-rw-r--. 1 rskraba rskraba 165M May 20 11:26 /tmp/c53e3b3a-51bd-470f-bd56-2c195decc25f.zip
   -rw-rw-r--. 1 rskraba rskraba    0 May 20 11:41 /tmp/c543b15d-94c8-4629-802d-a20f5846eddb.zip
   ....
   ```
   
   
   

##########
File path: runners/twister2/src/main/java/org/apache/beam/runners/twister2/package-info.java
##########
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Internal implementation of the Beam runner for Apache Flink. */

Review comment:
       Looks like Twister2 to me!  :smile: (Fix comment in all package-info.java)

##########
File path: runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/AssignWindowsFunction.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.twister2.translators.functions;
+
+import edu.iu.dsc.tws.api.tset.TSetContext;
+import edu.iu.dsc.tws.api.tset.fn.ComputeCollectorFunc;
+import edu.iu.dsc.tws.api.tset.fn.RecordCollector;
+import java.io.ObjectStreamException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.logging.Logger;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.Environments;
+import org.apache.beam.runners.core.construction.SdkComponents;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
+import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
+import org.apache.beam.runners.twister2.utils.Twister2AssignContext;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PortablePipelineOptions;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.InvalidProtocolBufferException;
+
+/** Assign Windows function. */
+public class AssignWindowsFunction<T>
+    implements ComputeCollectorFunc<WindowedValue<T>, Iterator<WindowedValue<T>>> {
+  private static final Logger LOG = Logger.getLogger(AssignWindowsFunction.class.getName());
+
+  private transient boolean isInitilized = false;

Review comment:
       Typo `isInitialized` (throughout the code).

##########
File path: runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2StreamTranslationContext.java
##########
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.twister2;
+
+/** Twister2StreamTranslationContext. */
+public class Twister2StreamTranslationContext {}

Review comment:
       Is this file a mistake?  There's already `Twister2StreamingTranslationContext` so it could lead to confusion with the similar name.  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org