You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by li...@apache.org on 2016/05/09 01:48:15 UTC

hive git commit: HIVE-13525: HoS hangs when job is empty (Rui reviewed by Szehon and Xuefu)

Repository: hive
Updated Branches:
  refs/heads/master 98479c300 -> 48be04b2c


HIVE-13525: HoS hangs when job is empty (Rui reviewed by Szehon and Xuefu)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/48be04b2
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/48be04b2
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/48be04b2

Branch: refs/heads/master
Commit: 48be04b2c5ed7a6e6bcf163f240f50798f820065
Parents: 98479c3
Author: Rui Li <ru...@intel.com>
Authored: Mon May 9 09:47:11 2016 +0800
Committer: Rui Li <ru...@intel.com>
Committed: Mon May 9 09:47:30 2016 +0800

----------------------------------------------------------------------
 data/conf/spark/standalone/hive-site.xml        |  2 +-
 pom.xml                                         |  2 +
 .../persistence/MapJoinTableContainerSerDe.java | 92 +++++++++++---------
 .../apache/hive/spark/client/RemoteDriver.java  | 48 ++++++----
 4 files changed, 88 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/48be04b2/data/conf/spark/standalone/hive-site.xml
----------------------------------------------------------------------
diff --git a/data/conf/spark/standalone/hive-site.xml b/data/conf/spark/standalone/hive-site.xml
index ca6ec0f..bc8c0e5 100644
--- a/data/conf/spark/standalone/hive-site.xml
+++ b/data/conf/spark/standalone/hive-site.xml
@@ -216,7 +216,7 @@
 
 <property>
   <name>spark.driver.extraClassPath</name>
-  <value>${maven.local.repository}/org/apache/hive/hive-it-util/${hive.version}/hive-it-util-${hive.version}.jar:${maven.local.repository}/org/apache/hive/hive-exec/${hive.version}/hive-exec-${hive.version}.jar</value>
+  <value>${maven.local.repository}/org/apache/hive/hive-it-util/${hive.version}/hive-it-util-${hive.version}.jar:${maven.local.repository}/org/apache/hive/hive-exec/${hive.version}/hive-exec-${hive.version}.jar:${maven.local.repository}/org/antlr/antlr-runtime/${antlr.version}/antlr-runtime-${antlr.version}.jar</value>
 </property>
 
 <property>

http://git-wip-us.apache.org/repos/asf/hive/blob/48be04b2/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c99b05a..41522de 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1037,6 +1037,8 @@
             <!-- EnforceReadOnlyTables hook and QTestUtil -->
             <test.src.tables>src,src1,srcbucket,srcbucket2,src_json,src_thrift,src_sequencefile,srcpart,alltypesorc,src_hbase,cbo_t1,cbo_t2,cbo_t3,src_cbo,part,lineitem</test.src.tables>
             <java.security.krb5.conf>${test.tmp.dir}/conf/krb5.conf</java.security.krb5.conf>
+            <!-- Required by spark to work around SPARK-14958 -->
+            <antlr.version>${antlr.version}</antlr.version>
           </systemPropertyVariables>
         </configuration>
       </plugin>

http://git-wip-us.apache.org/repos/asf/hive/blob/48be04b2/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java
index 7a36b53..eb48dd7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java
@@ -108,12 +108,16 @@ public class MapJoinTableContainerSerDe {
   public MapJoinTableContainer load(
       FileSystem fs, Path folder, Configuration hconf) throws HiveException {
     try {
+
+      if (!fs.exists(folder)) {
+        return getDefaultEmptyContainer(keyContext, valueContext);
+      }
       if (!fs.isDirectory(folder)) {
         throw new HiveException("Error, not a directory: " + folder);
       }
       FileStatus[] fileStatuses = fs.listStatus(folder);
       if (fileStatuses == null || fileStatuses.length == 0) {
-        return null;
+        return getDefaultEmptyContainer(keyContext, valueContext);
       }
 
       SerDe keySerDe = keyContext.getSerDe();
@@ -210,50 +214,51 @@ public class MapJoinTableContainerSerDe {
   public MapJoinTableContainer loadFastContainer(MapJoinDesc mapJoinDesc,
       FileSystem fs, Path folder, Configuration hconf) throws HiveException {
     try {
-      if (!fs.isDirectory(folder)) {
-        throw new HiveException("Error, not a directory: " + folder);
-      }
-      FileStatus[] fileStatuses = fs.listStatus(folder);
-      if (fileStatuses == null || fileStatuses.length == 0) {
-        return null;
-      }
-
-      SerDe keySerDe = keyContext.getSerDe();
-      SerDe valueSerDe = valueContext.getSerDe();
-      Writable key = keySerDe.getSerializedClass().newInstance();
-      Writable value = valueSerDe.getSerializedClass().newInstance();
-
       VectorMapJoinFastTableContainer tableContainer =
           new VectorMapJoinFastTableContainer(mapJoinDesc, hconf, -1);
       tableContainer.setSerde(keyContext, valueContext);
 
-      for (FileStatus fileStatus : fileStatuses) {
-        Path filePath = fileStatus.getPath();
-        if (ShimLoader.getHadoopShims().isDirectory(fileStatus)) {
-          throw new HiveException("Error, not a file: " + filePath);
+      if (fs.exists(folder)) {
+        if (!fs.isDirectory(folder)) {
+          throw new HiveException("Error, not a directory: " + folder);
         }
-        InputStream is = null;
-        ObjectInputStream in = null;
-        try {
-          is = fs.open(filePath, 4096);
-          in = new ObjectInputStream(is);
-          // skip the name and metadata
-          in.readUTF();
-          in.readObject();
-          int numKeys = in.readInt();
-          for (int keyIndex = 0; keyIndex < numKeys; keyIndex++) {
-            key.readFields(in);
-            long numRows = in.readLong();
-            for (long rowIndex = 0L; rowIndex < numRows; rowIndex++) {
-              value.readFields(in);
-              tableContainer.putRow(key, value);
+
+        FileStatus[] fileStatuses = fs.listStatus(folder);
+        if (fileStatuses != null && fileStatuses.length > 0) {
+          SerDe keySerDe = keyContext.getSerDe();
+          SerDe valueSerDe = valueContext.getSerDe();
+          Writable key = keySerDe.getSerializedClass().newInstance();
+          Writable value = valueSerDe.getSerializedClass().newInstance();
+
+          for (FileStatus fileStatus : fileStatuses) {
+            Path filePath = fileStatus.getPath();
+            if (ShimLoader.getHadoopShims().isDirectory(fileStatus)) {
+              throw new HiveException("Error, not a file: " + filePath);
+            }
+            InputStream is = null;
+            ObjectInputStream in = null;
+            try {
+              is = fs.open(filePath, 4096);
+              in = new ObjectInputStream(is);
+              // skip the name and metadata
+              in.readUTF();
+              in.readObject();
+              int numKeys = in.readInt();
+              for (int keyIndex = 0; keyIndex < numKeys; keyIndex++) {
+                key.readFields(in);
+                long numRows = in.readLong();
+                for (long rowIndex = 0L; rowIndex < numRows; rowIndex++) {
+                  value.readFields(in);
+                  tableContainer.putRow(key, value);
+                }
+              }
+            } finally {
+              if (in != null) {
+                in.close();
+              } else if (is != null) {
+                is.close();
+              }
             }
-          }
-        } finally {
-          if (in != null) {
-            in.close();
-          } else if (is != null) {
-            is.close();
           }
         }
       }
@@ -312,4 +317,13 @@ public class MapJoinTableContainerSerDe {
       throw new HiveException(msg, e);
     }
   }
+
+  // Get an empty container when the small table is empty.
+  private static MapJoinTableContainer getDefaultEmptyContainer(MapJoinObjectSerDeContext keyCtx,
+      MapJoinObjectSerDeContext valCtx) throws SerDeException {
+    MapJoinTableContainer container = new HashMapWrapper();
+    container.setSerde(keyCtx, valCtx);
+    container.seal();
+    return container;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/48be04b2/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
index f5b1e48..e3b88d1 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
@@ -45,6 +45,7 @@ import org.apache.hive.spark.client.rpc.RpcConfiguration;
 import org.apache.hive.spark.counter.SparkCounters;
 import org.apache.spark.JavaSparkListener;
 import org.apache.spark.SparkConf;
+import org.apache.spark.SparkJobInfo;
 import org.apache.spark.api.java.JavaFutureAction;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.scheduler.SparkListenerJobEnd;
@@ -323,18 +324,22 @@ public class RemoteDriver {
 
     private final BaseProtocol.JobRequest<T> req;
     private final List<JavaFutureAction<?>> jobs;
-    private final AtomicInteger completed;
+    private final AtomicInteger jobEndReceived;
+    private int completed;
     private SparkCounters sparkCounters;
     private Set<Integer> cachedRDDIds;
+    private Integer sparkJobId;
 
     private Future<?> future;
 
     JobWrapper(BaseProtocol.JobRequest<T> req) {
       this.req = req;
       this.jobs = Lists.newArrayList();
-      this.completed = new AtomicInteger();
+      completed = 0;
+      jobEndReceived = new AtomicInteger(0);
       this.sparkCounters = null;
       this.cachedRDDIds = null;
+      this.sparkJobId = null;
     }
 
     @Override
@@ -351,11 +356,26 @@ public class RemoteDriver {
         });
 
         T result = req.job.call(jc);
-        synchronized (completed) {
-          while (completed.get() < jobs.size()) {
-            LOG.debug("Client job {} finished, {} of {} Spark jobs finished.",
-                req.id, completed.get(), jobs.size());
-            completed.wait();
+        // In case the job is empty, there won't be JobStart/JobEnd events. The only way
+        // to know if the job has finished is to check the futures here ourselves.
+        for (JavaFutureAction<?> future : jobs) {
+          future.get();
+          completed++;
+          LOG.debug("Client job {}: {} of {} Spark jobs finished.",
+              req.id, completed, jobs.size());
+        }
+
+        // If the job is not empty (but runs fast), we have to wait until all the TaskEnd/JobEnd
+        // events are processed. Otherwise, task metrics may get lost. See HIVE-13525.
+        if (sparkJobId != null) {
+          SparkJobInfo sparkJobInfo = jc.sc().statusTracker().getJobInfo(sparkJobId);
+          if (sparkJobInfo != null && sparkJobInfo.stageIds() != null &&
+              sparkJobInfo.stageIds().length > 0) {
+            synchronized (jobEndReceived) {
+              while (jobEndReceived.get() < jobs.size()) {
+                jobEndReceived.wait();
+              }
+            }
           }
         }
 
@@ -363,11 +383,6 @@ public class RemoteDriver {
         if (sparkCounters != null) {
           counters = sparkCounters.snapshot();
         }
-        // make sure job has really succeeded
-        // at this point, future.get shall not block us
-        for (JavaFutureAction<?> future : jobs) {
-          future.get();
-        }
         protocol.jobFinished(req.id, result, null, counters);
       } catch (Throwable t) {
         // Catch throwables in a best-effort to report job status back to the client. It's
@@ -390,9 +405,9 @@ public class RemoteDriver {
     }
 
     void jobDone() {
-      synchronized (completed) {
-        completed.incrementAndGet();
-        completed.notifyAll();
+      synchronized (jobEndReceived) {
+        jobEndReceived.incrementAndGet();
+        jobEndReceived.notifyAll();
       }
     }
 
@@ -420,7 +435,8 @@ public class RemoteDriver {
       jc.getMonitoredJobs().get(req.id).add(job);
       this.sparkCounters = sparkCounters;
       this.cachedRDDIds = cachedRDDIds;
-      protocol.jobSubmitted(req.id, job.jobIds().get(0));
+      sparkJobId = job.jobIds().get(0);
+      protocol.jobSubmitted(req.id, sparkJobId);
     }
 
   }