You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2020/05/13 03:23:02 UTC

[hive] branch master updated: HIVE-23409 : If TezSession application reopen fails for Timeline service down, default TezSession from SessionPool is closed after a retry ( Naresh PR via Ashutosh Chauhan)

This is an automated email from the ASF dual-hosted git repository.

hashutosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ffbbdc  HIVE-23409 : If TezSession application reopen fails for Timeline service down, default TezSession from SessionPool is closed after a retry ( Naresh PR via Ashutosh Chauhan)
9ffbbdc is described below

commit 9ffbbdc1bc09ea18cda650d65963d6b348403eb9
Author: nareshpr <pr...@gmail.com>
AuthorDate: Thu May 7 20:58:20 2020 -0700

    HIVE-23409 : If TezSession application reopen fails for Timeline service down, default TezSession from SessionPool is closed after a retry ( Naresh PR via Ashutosh Chauhan)
    
    Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
---
 .../apache/hadoop/hive/ql/exec/tez/TezTask.java    | 12 ++++--
 .../hadoop/hive/ql/exec/tez/TestTezTask.java       | 49 ++++++++++++++++++++++
 2 files changed, 57 insertions(+), 4 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index b1bf2f8..3d27632 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -573,7 +573,6 @@ public class TezTask extends Task<TezWork> {
         dagClient = sessionState.getSession().submitDAG(dag);
       } catch (SessionNotRunning nr) {
         console.printInfo("Tez session was closed. Reopening...");
-        sessionStateRef.value = null;
         sessionStateRef.value = sessionState = getNewTezSessionOnError(sessionState);
         console.printInfo("Session re-established.");
         dagClient = sessionState.getSession().submitDAG(dag);
@@ -583,13 +582,18 @@ public class TezTask extends Task<TezWork> {
       try {
         console.printInfo("Dag submit failed due to " + e.getMessage() + " stack trace: "
             + Arrays.toString(e.getStackTrace()) + " retrying...");
-        sessionStateRef.value = null;
         sessionStateRef.value = sessionState = getNewTezSessionOnError(sessionState);
         dagClient = sessionState.getSession().submitDAG(dag);
       } catch (Exception retryException) {
-        // we failed to submit after retrying. Destroy session and bail.
+        // we failed to submit after retrying.
+        // If this is a non-pool session, destroy it.
+        // Otherwise move it to sessionPool, reopen will retry.
         sessionStateRef.value = null;
-        sessionState.destroy();
+        if (sessionState.isDefault() && sessionState instanceof TezSessionPoolSession) {
+          sessionState.returnToSessionManager();
+        } else {
+          sessionState.destroy();
+        }
         throw retryException;
       }
     }
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
index cdcac45..e5ae58c 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
@@ -69,6 +69,7 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.anyMap;
 import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
@@ -232,6 +233,54 @@ public class TestTezTask {
   }
 
   @Test
+  public void testSubmitOnNonPoolSession() throws Exception {
+    DAG dag = DAG.create("test");
+
+    // Destroy session incase of non-pool tez session
+    TezSessionState tezSessionState = mock(TezSessionState.class);
+    TezClient tezClient = mock(TezClient.class);
+    when(tezSessionState.reopen()).thenThrow(new HiveException("Dag cannot be submitted"));
+    when(tezSessionState.getSession()).thenReturn(tezClient);
+    when(tezClient.submitDAG(any(DAG.class))).thenThrow(new SessionNotRunning(""));
+    doNothing().when(tezSessionState).destroy();
+    boolean isException = false;
+    try {
+      task.submit(dag, Ref.from(tezSessionState));
+    } catch (Exception e) {
+      isException = true;
+      verify(tezClient, times(1)).submitDAG(any(DAG.class));
+      verify(tezSessionState, times(2)).reopen();
+      verify(tezSessionState, times(1)).destroy();
+      verify(tezSessionState, times(0)).returnToSessionManager();
+    }
+    assertTrue(isException);
+  }
+
+  @Test
+  public void testSubmitOnPoolSession() throws Exception {
+    DAG dag = DAG.create("test");
+    // Move session to TezSessionPool, reopen will handle it
+    SampleTezSessionState tezSessionPoolSession = mock(SampleTezSessionState.class);
+    TezClient tezClient = mock(TezClient.class);
+    when(tezSessionPoolSession.reopen()).thenThrow(new HiveException("Dag cannot be submitted"));
+    doNothing().when(tezSessionPoolSession).returnToSessionManager();
+    when(tezSessionPoolSession.getSession()).thenReturn(tezClient);
+    when(tezSessionPoolSession.isDefault()).thenReturn(true);
+    when(tezClient.submitDAG(any(DAG.class))).thenThrow(new SessionNotRunning(""));
+    boolean isException = false;
+    try {
+      task.submit(dag, Ref.from(tezSessionPoolSession));
+    } catch (Exception e) {
+      isException = true;
+      verify(tezClient, times(1)).submitDAG(any(DAG.class));
+      verify(tezSessionPoolSession, times(2)).reopen();
+      verify(tezSessionPoolSession, times(0)).destroy();
+      verify(tezSessionPoolSession, times(1)).returnToSessionManager();
+    }
+    assertTrue(isException);
+  }
+
+  @Test
   public void testClose() throws HiveException {
     task.close(work, 0, null);
     verify(op, times(4)).jobClose(any(), eq(true));