You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2020/05/29 23:17:47 UTC
[hive] branch master updated: HIVE-23068: Error when submitting
fragment to LLAP via external client: IllegalStateException: Only a single
registration allowed per entity (Jason Dere,
reviewed by Prasanth Jayachandran)
This is an automated email from the ASF dual-hosted git repository.
jdere pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 60ff1fc HIVE-23068: Error when submitting fragment to LLAP via external client: IllegalStateException: Only a single registration allowed per entity (Jason Dere, reviewed by Prasanth Jayachandran)
60ff1fc is described below
commit 60ff1fc7f8a9b010b99db0b1add788289f836b77
Author: Jason Dere <jd...@cloudera.com>
AuthorDate: Fri May 29 16:14:06 2020 -0700
HIVE-23068: Error when submitting fragment to LLAP via external client: IllegalStateException: Only a single registration allowed per entity (Jason Dere, reviewed by Prasanth Jayachandran)
---
.../hadoop/hive/llap/daemon/impl/QueryInfo.java | 13 +++-
.../hive/llap/daemon/LlapDaemonTestUtils.java | 13 +++-
.../llap/daemon/impl/TestContainerRunnerImpl.java | 79 ++++++++++++++++++++++
3 files changed, 103 insertions(+), 2 deletions(-)
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
index c464c2f..fc3ae97 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
@@ -140,7 +140,18 @@ public class QueryInfo {
int attemptNumber, SignableVertexSpec vertexSpec, String fragmentIdString) {
QueryFragmentInfo fragmentInfo = new QueryFragmentInfo(
this, vertexName, fragmentNumber, attemptNumber, vertexSpec, fragmentIdString);
- knownFragments.add(fragmentInfo);
+ boolean wasUniqueFragment = knownFragments.add(fragmentInfo);
+ if (!wasUniqueFragment) {
+ // The same query fragment (including attempt number) has already been registered.
+ // This could potentially occur for external clients that are trying to submit the
+ // exact same fragment more than once (for example speculative execution of a query fragment).
+ // This should not occur for a non-external query fragment.
+ // Either way, registering the same fragment twice should be disallowed as the LLAP structures
+ // it will only ever have a single submission of a fragment.
+ String message = "Fragment " + fragmentIdString + "(isExternal=" + isExternalQuery()
+ + ") has already been registered.";
+ throw new IllegalArgumentException(message);
+ }
return fragmentInfo;
}
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/LlapDaemonTestUtils.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/LlapDaemonTestUtils.java
index 21f732b..83ee1e9 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/LlapDaemonTestUtils.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/LlapDaemonTestUtils.java
@@ -28,12 +28,23 @@ public class LlapDaemonTestUtils {
private LlapDaemonTestUtils() {}
public static SubmitWorkRequestProto buildSubmitProtoRequest(int fragmentNumber,
+ String appId, int dagId, int vId, String dagName,
+ int dagStartTime, int attemptStartTime, int numSelfAndUpstreamTasks, int numSelfAndUpstreamComplete,
+ int withinDagPriority, Credentials credentials) throws IOException {
+ return buildSubmitProtoRequest(fragmentNumber, 0,
+ appId, dagId, vId, dagName, dagStartTime, attemptStartTime,
+ numSelfAndUpstreamTasks, numSelfAndUpstreamComplete,
+ withinDagPriority, credentials);
+ }
+
+ public static SubmitWorkRequestProto buildSubmitProtoRequest(int fragmentNumber,
+ int attemptNumber,
String appId, int dagId, int vId, String dagName,
int dagStartTime, int attemptStartTime, int numSelfAndUpstreamTasks, int numSelfAndUpstreamComplete,
int withinDagPriority, Credentials credentials) throws IOException {
return SubmitWorkRequestProto
.newBuilder()
- .setAttemptNumber(0)
+ .setAttemptNumber(attemptNumber)
.setFragmentNumber(fragmentNumber)
.setWorkSpec(
LlapDaemonProtocolProtos.VertexOrBinary.newBuilder().setVertex(
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestContainerRunnerImpl.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestContainerRunnerImpl.java
index 93ca9f2..e1c0690 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestContainerRunnerImpl.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestContainerRunnerImpl.java
@@ -45,6 +45,7 @@ import java.io.File;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import static org.mockito.Mockito.mock;
@@ -132,6 +133,11 @@ public class TestContainerRunnerImpl {
@After
public void cleanup() throws Exception {
+ for (Object key : ShuffleHandler.get().getRegisteredApps().keySet()) {
+ String appId = (String) key;
+ ShuffleHandler.get().unregisterDag(null, appId, dagId);
+ }
+
containerRunner.serviceStop();
queryTracker.serviceStop();
executorService.serviceStop();
@@ -155,6 +161,7 @@ public class TestContainerRunnerImpl {
.setDagIndex(dagId)
.build())
.build();
+
containerRunner.registerDag(request);
Assert.assertEquals(ShuffleHandler.get().getRegisteredApps().size(), 1);
Assert.assertEquals(ShuffleHandler.get().getRegisteredApps().get(appId), dagId);
@@ -179,4 +186,76 @@ public class TestContainerRunnerImpl {
Assert.assertEquals(ShuffleHandler.get().getRegisteredDirectories().get(appId), dagId);
}
}
+
+ @Test(timeout = 10000)
+ public void testSubmitSameFragment() throws Exception {
+ Credentials credentials = new Credentials();
+ Token<LlapTokenIdentifier> sessionToken = new Token<>(
+ "identifier".getBytes(), "testPassword".getBytes(), new Text("kind"), new Text("service"));
+ TokenCache.setSessionToken(sessionToken, credentials);
+
+ RegisterDagRequestProto request = RegisterDagRequestProto.newBuilder()
+ .setUser(testUser)
+ .setCredentialsBinary(ByteString.copyFrom(LlapTezUtils.serializeCredentials(credentials)))
+ .setQueryIdentifier(
+ QueryIdentifierProto.newBuilder()
+ .setApplicationIdString(appId)
+ .setDagIndex(dagId)
+ .build())
+ .build();
+ containerRunner.registerDag(request);
+ Assert.assertEquals(ShuffleHandler.get().getRegisteredApps().size(), 1);
+ Assert.assertEquals(ShuffleHandler.get().getRegisteredApps().get(appId), dagId);
+ Assert.assertEquals(ShuffleHandler.get().getRegisteredDirectories().size(), 0);
+
+ int fragNum = 1;
+ int attemptNum = 0;
+ SubmitWorkRequestProto sRequest1 =
+ LlapDaemonTestUtils.buildSubmitProtoRequest(fragNum, attemptNum, appId,
+ dagId, vId, "dagName", 0, 0,
+ 0, 0, 1,
+ credentials);
+
+ containerRunner.submitWork(sRequest1);
+ Assert.assertEquals(ShuffleHandler.get().getRegisteredApps().size(), 1);
+ Assert.assertEquals(ShuffleHandler.get().getRegisteredApps().get(appId), dagId);
+ if (ShuffleHandler.get().isDirWatcherEnabled()) {
+ Assert.assertEquals(ShuffleHandler.get().getRegisteredDirectories().size(), 1);
+ Assert.assertEquals(ShuffleHandler.get().getRegisteredDirectories().get(appId), dagId);
+ }
+
+ // submitWork() was successful, should show up as an active fragment.
+ Assert.assertEquals(1, containerRunner.getExecutorStatus().size());
+ boolean caughtException = false;
+
+ // Try exact same fragment ID + attempt number - should fail.
+ try {
+ SubmitWorkRequestProto sRequest2 =
+ LlapDaemonTestUtils.buildSubmitProtoRequest(fragNum, attemptNum, appId,
+ dagId, vId, "dagName", 0, 0,
+ 0, 0, 1,
+ credentials);
+
+ containerRunner.submitWork(sRequest2);
+ } catch (IllegalArgumentException err) {
+ err.printStackTrace();
+ caughtException = true;
+ }
+ Assert.assertTrue(caughtException);
+ // request failed so should still only have the 1 fragment
+ Assert.assertEquals(1, containerRunner.getExecutorStatus().size());
+
+ // Try same fragment ID with different attempt number - should work.
+ attemptNum = 1;
+ SubmitWorkRequestProto sRequest3 =
+ LlapDaemonTestUtils.buildSubmitProtoRequest(fragNum, attemptNum, appId,
+ dagId, vId, "dagName", 0, 0,
+ 0, 0, 1,
+ credentials);
+
+ containerRunner.submitWork(sRequest3);
+
+ // Should now have 2 fragments registered.
+ Assert.assertEquals(2, containerRunner.getExecutorStatus().size());
+ }
}