You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2020/05/13 03:23:02 UTC
[hive] branch master updated: HIVE-23409 : If TezSession
application reopen fails for Timeline service down,
default TezSession from SessionPool is closed after a retry ( Naresh PR via
Ashutosh Chauhan)
This is an automated email from the ASF dual-hosted git repository.
hashutosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 9ffbbdc HIVE-23409 : If TezSession application reopen fails for Timeline service down, default TezSession from SessionPool is closed after a retry ( Naresh PR via Ashutosh Chauhan)
9ffbbdc is described below
commit 9ffbbdc1bc09ea18cda650d65963d6b348403eb9
Author: nareshpr <pr...@gmail.com>
AuthorDate: Thu May 7 20:58:20 2020 -0700
HIVE-23409 : If TezSession application reopen fails for Timeline service down, default TezSession from SessionPool is closed after a retry ( Naresh PR via Ashutosh Chauhan)
Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
---
.../apache/hadoop/hive/ql/exec/tez/TezTask.java | 12 ++++--
.../hadoop/hive/ql/exec/tez/TestTezTask.java | 49 ++++++++++++++++++++++
2 files changed, 57 insertions(+), 4 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index b1bf2f8..3d27632 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -573,7 +573,6 @@ public class TezTask extends Task<TezWork> {
dagClient = sessionState.getSession().submitDAG(dag);
} catch (SessionNotRunning nr) {
console.printInfo("Tez session was closed. Reopening...");
- sessionStateRef.value = null;
sessionStateRef.value = sessionState = getNewTezSessionOnError(sessionState);
console.printInfo("Session re-established.");
dagClient = sessionState.getSession().submitDAG(dag);
@@ -583,13 +582,18 @@ public class TezTask extends Task<TezWork> {
try {
console.printInfo("Dag submit failed due to " + e.getMessage() + " stack trace: "
+ Arrays.toString(e.getStackTrace()) + " retrying...");
- sessionStateRef.value = null;
sessionStateRef.value = sessionState = getNewTezSessionOnError(sessionState);
dagClient = sessionState.getSession().submitDAG(dag);
} catch (Exception retryException) {
- // we failed to submit after retrying. Destroy session and bail.
+ // we failed to submit after retrying.
+ // If this is a non-pool session, destroy it.
+ // Otherwise move it to sessionPool, reopen will retry.
sessionStateRef.value = null;
- sessionState.destroy();
+ if (sessionState.isDefault() && sessionState instanceof TezSessionPoolSession) {
+ sessionState.returnToSessionManager();
+ } else {
+ sessionState.destroy();
+ }
throw retryException;
}
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
index cdcac45..e5ae58c 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
@@ -69,6 +69,7 @@ import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyMap;
import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
@@ -232,6 +233,54 @@ public class TestTezTask {
}
@Test
+ public void testSubmitOnNonPoolSession() throws Exception {
+ DAG dag = DAG.create("test");
+
+ // Destroy session incase of non-pool tez session
+ TezSessionState tezSessionState = mock(TezSessionState.class);
+ TezClient tezClient = mock(TezClient.class);
+ when(tezSessionState.reopen()).thenThrow(new HiveException("Dag cannot be submitted"));
+ when(tezSessionState.getSession()).thenReturn(tezClient);
+ when(tezClient.submitDAG(any(DAG.class))).thenThrow(new SessionNotRunning(""));
+ doNothing().when(tezSessionState).destroy();
+ boolean isException = false;
+ try {
+ task.submit(dag, Ref.from(tezSessionState));
+ } catch (Exception e) {
+ isException = true;
+ verify(tezClient, times(1)).submitDAG(any(DAG.class));
+ verify(tezSessionState, times(2)).reopen();
+ verify(tezSessionState, times(1)).destroy();
+ verify(tezSessionState, times(0)).returnToSessionManager();
+ }
+ assertTrue(isException);
+ }
+
+ @Test
+ public void testSubmitOnPoolSession() throws Exception {
+ DAG dag = DAG.create("test");
+ // Move session to TezSessionPool, reopen will handle it
+ SampleTezSessionState tezSessionPoolSession = mock(SampleTezSessionState.class);
+ TezClient tezClient = mock(TezClient.class);
+ when(tezSessionPoolSession.reopen()).thenThrow(new HiveException("Dag cannot be submitted"));
+ doNothing().when(tezSessionPoolSession).returnToSessionManager();
+ when(tezSessionPoolSession.getSession()).thenReturn(tezClient);
+ when(tezSessionPoolSession.isDefault()).thenReturn(true);
+ when(tezClient.submitDAG(any(DAG.class))).thenThrow(new SessionNotRunning(""));
+ boolean isException = false;
+ try {
+ task.submit(dag, Ref.from(tezSessionPoolSession));
+ } catch (Exception e) {
+ isException = true;
+ verify(tezClient, times(1)).submitDAG(any(DAG.class));
+ verify(tezSessionPoolSession, times(2)).reopen();
+ verify(tezSessionPoolSession, times(0)).destroy();
+ verify(tezSessionPoolSession, times(1)).returnToSessionManager();
+ }
+ assertTrue(isException);
+ }
+
+ @Test
public void testClose() throws HiveException {
task.close(work, 0, null);
verify(op, times(4)).jobClose(any(), eq(true));