You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by li...@apache.org on 2016/05/09 01:48:15 UTC
hive git commit: HIVE-13525: HoS hangs when job is empty (Rui
reviewed by Szehon and Xuefu)
Repository: hive
Updated Branches:
refs/heads/master 98479c300 -> 48be04b2c
HIVE-13525: HoS hangs when job is empty (Rui reviewed by Szehon and Xuefu)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/48be04b2
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/48be04b2
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/48be04b2
Branch: refs/heads/master
Commit: 48be04b2c5ed7a6e6bcf163f240f50798f820065
Parents: 98479c3
Author: Rui Li <ru...@intel.com>
Authored: Mon May 9 09:47:11 2016 +0800
Committer: Rui Li <ru...@intel.com>
Committed: Mon May 9 09:47:30 2016 +0800
----------------------------------------------------------------------
data/conf/spark/standalone/hive-site.xml | 2 +-
pom.xml | 2 +
.../persistence/MapJoinTableContainerSerDe.java | 92 +++++++++++---------
.../apache/hive/spark/client/RemoteDriver.java | 48 ++++++----
4 files changed, 88 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/48be04b2/data/conf/spark/standalone/hive-site.xml
----------------------------------------------------------------------
diff --git a/data/conf/spark/standalone/hive-site.xml b/data/conf/spark/standalone/hive-site.xml
index ca6ec0f..bc8c0e5 100644
--- a/data/conf/spark/standalone/hive-site.xml
+++ b/data/conf/spark/standalone/hive-site.xml
@@ -216,7 +216,7 @@
<property>
<name>spark.driver.extraClassPath</name>
- <value>${maven.local.repository}/org/apache/hive/hive-it-util/${hive.version}/hive-it-util-${hive.version}.jar:${maven.local.repository}/org/apache/hive/hive-exec/${hive.version}/hive-exec-${hive.version}.jar</value>
+ <value>${maven.local.repository}/org/apache/hive/hive-it-util/${hive.version}/hive-it-util-${hive.version}.jar:${maven.local.repository}/org/apache/hive/hive-exec/${hive.version}/hive-exec-${hive.version}.jar:${maven.local.repository}/org/antlr/antlr-runtime/${antlr.version}/antlr-runtime-${antlr.version}.jar</value>
</property>
<property>
http://git-wip-us.apache.org/repos/asf/hive/blob/48be04b2/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c99b05a..41522de 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1037,6 +1037,8 @@
<!-- EnforceReadOnlyTables hook and QTestUtil -->
<test.src.tables>src,src1,srcbucket,srcbucket2,src_json,src_thrift,src_sequencefile,srcpart,alltypesorc,src_hbase,cbo_t1,cbo_t2,cbo_t3,src_cbo,part,lineitem</test.src.tables>
<java.security.krb5.conf>${test.tmp.dir}/conf/krb5.conf</java.security.krb5.conf>
+ <!-- Required by spark to work around SPARK-14958 -->
+ <antlr.version>${antlr.version}</antlr.version>
</systemPropertyVariables>
</configuration>
</plugin>
http://git-wip-us.apache.org/repos/asf/hive/blob/48be04b2/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java
index 7a36b53..eb48dd7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java
@@ -108,12 +108,16 @@ public class MapJoinTableContainerSerDe {
public MapJoinTableContainer load(
FileSystem fs, Path folder, Configuration hconf) throws HiveException {
try {
+
+ if (!fs.exists(folder)) {
+ return getDefaultEmptyContainer(keyContext, valueContext);
+ }
if (!fs.isDirectory(folder)) {
throw new HiveException("Error, not a directory: " + folder);
}
FileStatus[] fileStatuses = fs.listStatus(folder);
if (fileStatuses == null || fileStatuses.length == 0) {
- return null;
+ return getDefaultEmptyContainer(keyContext, valueContext);
}
SerDe keySerDe = keyContext.getSerDe();
@@ -210,50 +214,51 @@ public class MapJoinTableContainerSerDe {
public MapJoinTableContainer loadFastContainer(MapJoinDesc mapJoinDesc,
FileSystem fs, Path folder, Configuration hconf) throws HiveException {
try {
- if (!fs.isDirectory(folder)) {
- throw new HiveException("Error, not a directory: " + folder);
- }
- FileStatus[] fileStatuses = fs.listStatus(folder);
- if (fileStatuses == null || fileStatuses.length == 0) {
- return null;
- }
-
- SerDe keySerDe = keyContext.getSerDe();
- SerDe valueSerDe = valueContext.getSerDe();
- Writable key = keySerDe.getSerializedClass().newInstance();
- Writable value = valueSerDe.getSerializedClass().newInstance();
-
VectorMapJoinFastTableContainer tableContainer =
new VectorMapJoinFastTableContainer(mapJoinDesc, hconf, -1);
tableContainer.setSerde(keyContext, valueContext);
- for (FileStatus fileStatus : fileStatuses) {
- Path filePath = fileStatus.getPath();
- if (ShimLoader.getHadoopShims().isDirectory(fileStatus)) {
- throw new HiveException("Error, not a file: " + filePath);
+ if (fs.exists(folder)) {
+ if (!fs.isDirectory(folder)) {
+ throw new HiveException("Error, not a directory: " + folder);
}
- InputStream is = null;
- ObjectInputStream in = null;
- try {
- is = fs.open(filePath, 4096);
- in = new ObjectInputStream(is);
- // skip the name and metadata
- in.readUTF();
- in.readObject();
- int numKeys = in.readInt();
- for (int keyIndex = 0; keyIndex < numKeys; keyIndex++) {
- key.readFields(in);
- long numRows = in.readLong();
- for (long rowIndex = 0L; rowIndex < numRows; rowIndex++) {
- value.readFields(in);
- tableContainer.putRow(key, value);
+
+ FileStatus[] fileStatuses = fs.listStatus(folder);
+ if (fileStatuses != null && fileStatuses.length > 0) {
+ SerDe keySerDe = keyContext.getSerDe();
+ SerDe valueSerDe = valueContext.getSerDe();
+ Writable key = keySerDe.getSerializedClass().newInstance();
+ Writable value = valueSerDe.getSerializedClass().newInstance();
+
+ for (FileStatus fileStatus : fileStatuses) {
+ Path filePath = fileStatus.getPath();
+ if (ShimLoader.getHadoopShims().isDirectory(fileStatus)) {
+ throw new HiveException("Error, not a file: " + filePath);
+ }
+ InputStream is = null;
+ ObjectInputStream in = null;
+ try {
+ is = fs.open(filePath, 4096);
+ in = new ObjectInputStream(is);
+ // skip the name and metadata
+ in.readUTF();
+ in.readObject();
+ int numKeys = in.readInt();
+ for (int keyIndex = 0; keyIndex < numKeys; keyIndex++) {
+ key.readFields(in);
+ long numRows = in.readLong();
+ for (long rowIndex = 0L; rowIndex < numRows; rowIndex++) {
+ value.readFields(in);
+ tableContainer.putRow(key, value);
+ }
+ }
+ } finally {
+ if (in != null) {
+ in.close();
+ } else if (is != null) {
+ is.close();
+ }
}
- }
- } finally {
- if (in != null) {
- in.close();
- } else if (is != null) {
- is.close();
}
}
}
@@ -312,4 +317,13 @@ public class MapJoinTableContainerSerDe {
throw new HiveException(msg, e);
}
}
+
+ // Get an empty container when the small table is empty.
+ private static MapJoinTableContainer getDefaultEmptyContainer(MapJoinObjectSerDeContext keyCtx,
+ MapJoinObjectSerDeContext valCtx) throws SerDeException {
+ MapJoinTableContainer container = new HashMapWrapper();
+ container.setSerde(keyCtx, valCtx);
+ container.seal();
+ return container;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/48be04b2/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
index f5b1e48..e3b88d1 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
@@ -45,6 +45,7 @@ import org.apache.hive.spark.client.rpc.RpcConfiguration;
import org.apache.hive.spark.counter.SparkCounters;
import org.apache.spark.JavaSparkListener;
import org.apache.spark.SparkConf;
+import org.apache.spark.SparkJobInfo;
import org.apache.spark.api.java.JavaFutureAction;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.scheduler.SparkListenerJobEnd;
@@ -323,18 +324,22 @@ public class RemoteDriver {
private final BaseProtocol.JobRequest<T> req;
private final List<JavaFutureAction<?>> jobs;
- private final AtomicInteger completed;
+ private final AtomicInteger jobEndReceived;
+ private int completed;
private SparkCounters sparkCounters;
private Set<Integer> cachedRDDIds;
+ private Integer sparkJobId;
private Future<?> future;
JobWrapper(BaseProtocol.JobRequest<T> req) {
this.req = req;
this.jobs = Lists.newArrayList();
- this.completed = new AtomicInteger();
+ completed = 0;
+ jobEndReceived = new AtomicInteger(0);
this.sparkCounters = null;
this.cachedRDDIds = null;
+ this.sparkJobId = null;
}
@Override
@@ -351,11 +356,26 @@ public class RemoteDriver {
});
T result = req.job.call(jc);
- synchronized (completed) {
- while (completed.get() < jobs.size()) {
- LOG.debug("Client job {} finished, {} of {} Spark jobs finished.",
- req.id, completed.get(), jobs.size());
- completed.wait();
+ // In case the job is empty, there won't be JobStart/JobEnd events. The only way
+ // to know if the job has finished is to check the futures here ourselves.
+ for (JavaFutureAction<?> future : jobs) {
+ future.get();
+ completed++;
+ LOG.debug("Client job {}: {} of {} Spark jobs finished.",
+ req.id, completed, jobs.size());
+ }
+
+ // If the job is not empty (but runs fast), we have to wait until all the TaskEnd/JobEnd
+ // events are processed. Otherwise, task metrics may get lost. See HIVE-13525.
+ if (sparkJobId != null) {
+ SparkJobInfo sparkJobInfo = jc.sc().statusTracker().getJobInfo(sparkJobId);
+ if (sparkJobInfo != null && sparkJobInfo.stageIds() != null &&
+ sparkJobInfo.stageIds().length > 0) {
+ synchronized (jobEndReceived) {
+ while (jobEndReceived.get() < jobs.size()) {
+ jobEndReceived.wait();
+ }
+ }
}
}
@@ -363,11 +383,6 @@ public class RemoteDriver {
if (sparkCounters != null) {
counters = sparkCounters.snapshot();
}
- // make sure job has really succeeded
- // at this point, future.get shall not block us
- for (JavaFutureAction<?> future : jobs) {
- future.get();
- }
protocol.jobFinished(req.id, result, null, counters);
} catch (Throwable t) {
// Catch throwables in a best-effort to report job status back to the client. It's
@@ -390,9 +405,9 @@ public class RemoteDriver {
}
void jobDone() {
- synchronized (completed) {
- completed.incrementAndGet();
- completed.notifyAll();
+ synchronized (jobEndReceived) {
+ jobEndReceived.incrementAndGet();
+ jobEndReceived.notifyAll();
}
}
@@ -420,7 +435,8 @@ public class RemoteDriver {
jc.getMonitoredJobs().get(req.id).add(job);
this.sparkCounters = sparkCounters;
this.cachedRDDIds = cachedRDDIds;
- protocol.jobSubmitted(req.id, job.jobIds().get(0));
+ sparkJobId = job.jobIds().get(0);
+ protocol.jobSubmitted(req.id, sparkJobId);
}
}