You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/08/17 20:33:56 UTC

[GitHub] [beam] amaliujia commented on a change in pull request #12601: [BEAM-9891] Generate query execution summary table after finishing jobs

amaliujia commented on a change in pull request #12601:
URL: https://github.com/apache/beam/pull/12601#discussion_r471758078



##########
File path: sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SqlTransformRunner.java
##########
@@ -121,19 +152,26 @@ public static void runUsingSqlTransform(String[] args) throws Exception {
             String queryString = QueryReader.readQuery(queryNameArr[i]);
             PCollectionTuple tables = getTables(pipelines[i], csvFormat, queryNameArr[i]);
 
-            tables
-                    .apply(
-                            SqlTransform.query(queryString))
-                    .apply(
-                            MapElements.into(TypeDescriptors.strings()).via((Row row) -> row.toString()))
-                    .apply(TextIO.write()
-                            .to(resultDirectory + "/" + dataSize + "/" + pipelines[i].getOptions().getJobName())
-                            .withSuffix(".txt")
-                            .withNumShards(1));
+            try {
+                tables
+                        .apply(
+                                SqlTransform.query(queryString))
+                        .apply(
+                                MapElements.into(TypeDescriptors.strings()).via((Row row) -> row.toString()))
+                        .apply(TextIO.write()
+                                .to(RESULT_DIRECTORY + "/" + dataSize + "/" + pipelines[i].getOptions().getJobName())
+                                .withSuffix(".txt")
+                                .withNumShards(1));
+            } catch (Exception e) {
+                System.out.println(queryNameArr[i] + " failed to execute");

Review comment:
       Can you replace this with LOG? Here is an example of LOG: https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java#L75

##########
File path: sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SqlTransformRunner.java
##########
@@ -97,7 +128,7 @@ public static void runUsingSqlTransform(String[] args) throws Exception {
 
         // Using ExecutorService and CompletionService to fulfill multi-threading functionality
         ExecutorService executor = Executors.newFixedThreadPool(nThreads);
-        CompletionService<PipelineResult> completion = new ExecutorCompletionService<>(executor);
+        CompletionService<TpcdsRunResult> completion = new ExecutorCompletionService<>(executor);

Review comment:
       Have also check the state of the PipelineResult: https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java#L38? 
   
   Make sure only print successful when job state is successful.

##########
File path: sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsRun.java
##########
@@ -24,17 +24,30 @@
 /**
  * To fulfill multi-threaded execution
  */
-public class TpcdsRun implements Callable<PipelineResult> {
+public class TpcdsRun implements Callable<TpcdsRunResult> {
     private final Pipeline pipeline;
 
     public TpcdsRun (Pipeline pipeline) {
         this.pipeline = pipeline;
     }
 
     @Override
-    public PipelineResult call() {
-        PipelineResult pipelineResult = pipeline.run();
-        pipelineResult.waitUntilFinish();
-        return pipelineResult;
+    public TpcdsRunResult call() {
+        TpcdsRunResult tpcdsRunResult;
+
+        try {
+            PipelineResult pipelineResult = pipeline.run();
+            long startTimeStamp = System.currentTimeMillis();
+            pipelineResult.waitUntilFinish();
+            long endTimeStamp = System.currentTimeMillis();

Review comment:
       check pipeline result state.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org