You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2020/05/29 23:17:47 UTC

[hive] branch master updated: HIVE-23068: Error when submitting fragment to LLAP via external client: IllegalStateException: Only a single registration allowed per entity (Jason Dere, reviewed by Prasanth Jayachandran)

This is an automated email from the ASF dual-hosted git repository.

jdere pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 60ff1fc  HIVE-23068: Error when submitting fragment to LLAP via external client: IllegalStateException: Only a single registration allowed per entity (Jason Dere, reviewed by Prasanth Jayachandran)
60ff1fc is described below

commit 60ff1fc7f8a9b010b99db0b1add788289f836b77
Author: Jason Dere <jd...@cloudera.com>
AuthorDate: Fri May 29 16:14:06 2020 -0700

    HIVE-23068: Error when submitting fragment to LLAP via external client: IllegalStateException: Only a single registration allowed per entity (Jason Dere, reviewed by Prasanth Jayachandran)
---
 .../hadoop/hive/llap/daemon/impl/QueryInfo.java    | 13 +++-
 .../hive/llap/daemon/LlapDaemonTestUtils.java      | 13 +++-
 .../llap/daemon/impl/TestContainerRunnerImpl.java  | 79 ++++++++++++++++++++++
 3 files changed, 103 insertions(+), 2 deletions(-)

diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
index c464c2f..fc3ae97 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
@@ -140,7 +140,18 @@ public class QueryInfo {
       int attemptNumber, SignableVertexSpec vertexSpec, String fragmentIdString) {
     QueryFragmentInfo fragmentInfo = new QueryFragmentInfo(
         this, vertexName, fragmentNumber, attemptNumber, vertexSpec, fragmentIdString);
-    knownFragments.add(fragmentInfo);
+    boolean wasUniqueFragment = knownFragments.add(fragmentInfo);
+    if (!wasUniqueFragment) {
+      // The same query fragment (including attempt number) has already been registered.
+      // This could potentially occur for external clients that are trying to submit the
+      // exact same fragment more than once (for example speculative execution of a query fragment).
+      // This should not occur for a non-external query fragment.
+      // Either way, registering the same fragment twice should be disallowed as the LLAP structures
+      // it will only ever have a single submission of a fragment.
+      String message = "Fragment " + fragmentIdString + "(isExternal=" + isExternalQuery()
+              + ") has already been registered.";
+      throw new IllegalArgumentException(message);
+    }
     return fragmentInfo;
   }
 
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/LlapDaemonTestUtils.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/LlapDaemonTestUtils.java
index 21f732b..83ee1e9 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/LlapDaemonTestUtils.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/LlapDaemonTestUtils.java
@@ -28,12 +28,23 @@ public class LlapDaemonTestUtils {
   private LlapDaemonTestUtils() {}
 
   public static SubmitWorkRequestProto buildSubmitProtoRequest(int fragmentNumber,
+                                                               String appId, int dagId, int vId, String dagName,
+                                                               int dagStartTime, int attemptStartTime, int numSelfAndUpstreamTasks, int numSelfAndUpstreamComplete,
+                                                               int withinDagPriority, Credentials credentials) throws IOException {
+    return buildSubmitProtoRequest(fragmentNumber, 0,
+            appId, dagId, vId, dagName, dagStartTime, attemptStartTime,
+            numSelfAndUpstreamTasks, numSelfAndUpstreamComplete,
+            withinDagPriority, credentials);
+  }
+
+  public static SubmitWorkRequestProto buildSubmitProtoRequest(int fragmentNumber,
+      int attemptNumber,
       String appId, int dagId, int vId, String dagName,
       int dagStartTime, int attemptStartTime, int numSelfAndUpstreamTasks, int numSelfAndUpstreamComplete,
       int withinDagPriority, Credentials credentials) throws IOException {
     return SubmitWorkRequestProto
         .newBuilder()
-        .setAttemptNumber(0)
+        .setAttemptNumber(attemptNumber)
         .setFragmentNumber(fragmentNumber)
         .setWorkSpec(
             LlapDaemonProtocolProtos.VertexOrBinary.newBuilder().setVertex(
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestContainerRunnerImpl.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestContainerRunnerImpl.java
index 93ca9f2..e1c0690 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestContainerRunnerImpl.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestContainerRunnerImpl.java
@@ -45,6 +45,7 @@ import java.io.File;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.mockito.Mockito.mock;
@@ -132,6 +133,11 @@ public class TestContainerRunnerImpl {
 
   @After
   public void cleanup() throws Exception {
+    for (Object key : ShuffleHandler.get().getRegisteredApps().keySet()) {
+      String appId = (String) key;
+      ShuffleHandler.get().unregisterDag(null, appId, dagId);
+    }
+
     containerRunner.serviceStop();
     queryTracker.serviceStop();
     executorService.serviceStop();
@@ -155,6 +161,7 @@ public class TestContainerRunnerImpl {
                 .setDagIndex(dagId)
                 .build())
         .build();
+
     containerRunner.registerDag(request);
     Assert.assertEquals(ShuffleHandler.get().getRegisteredApps().size(), 1);
     Assert.assertEquals(ShuffleHandler.get().getRegisteredApps().get(appId), dagId);
@@ -179,4 +186,76 @@ public class TestContainerRunnerImpl {
       Assert.assertEquals(ShuffleHandler.get().getRegisteredDirectories().get(appId), dagId);
     }
   }
+
+  @Test(timeout = 10000)
+  public void testSubmitSameFragment() throws Exception {
+    Credentials credentials = new Credentials();
+    Token<LlapTokenIdentifier> sessionToken = new Token<>(
+            "identifier".getBytes(), "testPassword".getBytes(), new Text("kind"), new Text("service"));
+    TokenCache.setSessionToken(sessionToken, credentials);
+
+    RegisterDagRequestProto request = RegisterDagRequestProto.newBuilder()
+            .setUser(testUser)
+            .setCredentialsBinary(ByteString.copyFrom(LlapTezUtils.serializeCredentials(credentials)))
+            .setQueryIdentifier(
+                    QueryIdentifierProto.newBuilder()
+                            .setApplicationIdString(appId)
+                            .setDagIndex(dagId)
+                            .build())
+            .build();
+    containerRunner.registerDag(request);
+    Assert.assertEquals(ShuffleHandler.get().getRegisteredApps().size(), 1);
+    Assert.assertEquals(ShuffleHandler.get().getRegisteredApps().get(appId), dagId);
+    Assert.assertEquals(ShuffleHandler.get().getRegisteredDirectories().size(), 0);
+
+    int fragNum = 1;
+    int attemptNum = 0;
+    SubmitWorkRequestProto sRequest1 =
+            LlapDaemonTestUtils.buildSubmitProtoRequest(fragNum, attemptNum, appId,
+                    dagId, vId, "dagName", 0, 0,
+                    0, 0, 1,
+                    credentials);
+
+    containerRunner.submitWork(sRequest1);
+    Assert.assertEquals(ShuffleHandler.get().getRegisteredApps().size(), 1);
+    Assert.assertEquals(ShuffleHandler.get().getRegisteredApps().get(appId), dagId);
+    if (ShuffleHandler.get().isDirWatcherEnabled()) {
+      Assert.assertEquals(ShuffleHandler.get().getRegisteredDirectories().size(), 1);
+      Assert.assertEquals(ShuffleHandler.get().getRegisteredDirectories().get(appId), dagId);
+    }
+
+    // submitWork() was successful, should show up as an active fragment.
+    Assert.assertEquals(1, containerRunner.getExecutorStatus().size());
+    boolean caughtException = false;
+
+    // Try exact same fragment ID + attempt number - should fail.
+    try {
+      SubmitWorkRequestProto sRequest2 =
+              LlapDaemonTestUtils.buildSubmitProtoRequest(fragNum, attemptNum, appId,
+                      dagId, vId, "dagName", 0, 0,
+                      0, 0, 1,
+                      credentials);
+
+      containerRunner.submitWork(sRequest2);
+    } catch (IllegalArgumentException err) {
+      err.printStackTrace();
+      caughtException = true;
+    }
+    Assert.assertTrue(caughtException);
+    // request failed so should still only have the 1 fragment
+    Assert.assertEquals(1, containerRunner.getExecutorStatus().size());
+
+    // Try same fragment ID with different attempt number - should work.
+    attemptNum = 1;
+    SubmitWorkRequestProto sRequest3 =
+            LlapDaemonTestUtils.buildSubmitProtoRequest(fragNum, attemptNum, appId,
+                    dagId, vId, "dagName", 0, 0,
+                    0, 0, 1,
+                    credentials);
+
+    containerRunner.submitWork(sRequest3);
+
+    // Should now have 2 fragments registered.
+    Assert.assertEquals(2, containerRunner.getExecutorStatus().size());
+  }
 }