You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/12/17 21:53:58 UTC

svn commit: r1646335 - /hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java

Author: xuefu
Date: Wed Dec 17 20:53:58 2014
New Revision: 1646335

URL: http://svn.apache.org/r1646335
Log:
HIVE-8972: Implement more fine-grained remote client-level events [Spark Branch] (Rui via Xuefu)

Modified:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java?rev=1646335&r1=1646334&r2=1646335&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java Wed Dec 17 20:53:58 2014
@@ -42,15 +42,16 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Used with remove spark client.
  */
 public class RemoteSparkJobStatus implements SparkJobStatus {
   private static final Log LOG = LogFactory.getLog(RemoteSparkJobStatus.class.getName());
-  // time (in seconds) to wait for a spark job to be submitted on remote cluster
+  // time (in milliseconds) to wait for a spark job to be submitted on remote cluster
   // after this period, we decide the job submission has failed so that client won't hang forever
-  private static final int WAIT_SUBMISSION_TIMEOUT = 30;
+  private static final long WAIT_SUBMISSION_TIMEOUT = 30000;
   // remember when the monitor starts
   private final long startTime;
   private final SparkClient sparkClient;
@@ -59,7 +60,7 @@ public class RemoteSparkJobStatus implem
   public RemoteSparkJobStatus(SparkClient sparkClient, JobHandle<Serializable> jobHandle) {
     this.sparkClient = sparkClient;
     this.jobHandle = jobHandle;
-    startTime = System.currentTimeMillis();
+    startTime = System.nanoTime();
   }
 
   @Override
@@ -70,7 +71,7 @@ public class RemoteSparkJobStatus implem
   @Override
   public JobExecutionStatus getState() {
     SparkJobInfo sparkJobInfo = getSparkJobInfo();
-    return sparkJobInfo != null ? sparkJobInfo.status() : JobExecutionStatus.UNKNOWN;
+    return sparkJobInfo != null ? sparkJobInfo.status() : null;
   }
 
   @Override
@@ -132,28 +133,14 @@ public class RemoteSparkJobStatus implem
     Integer sparkJobId = jobHandle.getSparkJobIds().size() == 1 ?
         jobHandle.getSparkJobIds().get(0) : null;
     if (sparkJobId == null) {
-      int duration = (int) ((System.currentTimeMillis() - startTime) / 1000);
+      long duration = TimeUnit.MILLISECONDS.convert(
+          System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
       if (duration <= WAIT_SUBMISSION_TIMEOUT) {
         return null;
       } else {
-        LOG.info("Job hasn't been submitted after " + duration + "s. Aborting it.");
+        LOG.info("Job hasn't been submitted after " + duration / 1000 + "s. Aborting it.");
         jobHandle.cancel(false);
-        return new SparkJobInfo() {
-          @Override
-          public int jobId() {
-            return -1;
-          }
-
-          @Override
-          public int[] stageIds() {
-            return new int[0];
-          }
-
-          @Override
-          public JobExecutionStatus status() {
-            return JobExecutionStatus.FAILED;
-          }
-        };
+        return getDefaultJobInfo(sparkJobId, JobExecutionStatus.FAILED);
       }
     }
     JobHandle<SparkJobInfo> getJobInfo = sparkClient.submit(
@@ -198,42 +185,12 @@ public class RemoteSparkJobStatus implem
         if (list != null && list.size() == 1) {
           JavaFutureAction<?> futureAction = list.get(0);
           if (futureAction.isDone()) {
-            jobInfo = new SparkJobInfo() {
-              @Override
-              public int jobId() {
-                return sparkJobId;
-              }
-
-              @Override
-              public int[] stageIds() {
-                return new int[0];
-              }
-
-              @Override
-              public JobExecutionStatus status() {
-                return JobExecutionStatus.SUCCEEDED;
-              }
-            };
+            jobInfo = getDefaultJobInfo(sparkJobId, JobExecutionStatus.SUCCEEDED);
           }
         }
       }
       if (jobInfo == null) {
-        jobInfo = new SparkJobInfo() {
-          @Override
-          public int jobId() {
-            return -1;
-          }
-
-          @Override
-          public int[] stageIds() {
-            return new int[0];
-          }
-
-          @Override
-          public JobExecutionStatus status() {
-            return JobExecutionStatus.UNKNOWN;
-          }
-        };
+        jobInfo = getDefaultJobInfo(sparkJobId, JobExecutionStatus.UNKNOWN);
       }
       return jobInfo;
     }
@@ -288,4 +245,25 @@ public class RemoteSparkJobStatus implem
 
     return results;
   }
+
+  private static SparkJobInfo getDefaultJobInfo(final Integer jobId,
+      final JobExecutionStatus status) {
+    return new SparkJobInfo() {
+
+      @Override
+      public int jobId() {
+        return jobId == null ? -1 : jobId;
+      }
+
+      @Override
+      public int[] stageIds() {
+        return new int[0];
+      }
+
+      @Override
+      public JobExecutionStatus status() {
+        return status;
+      }
+    };
+  }
 }