You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ss...@apache.org on 2016/01/25 00:36:55 UTC
[3/8] hive git commit: HIVE-12448. Change to tracking of dag status
via dagIdentifier instead of dag name. (Siddharth Seth,
reviewed by Sergey Shelukhin) (cherry picked from commit
16d495809382cf1db54ab26ff3a7ba5d57caa9b2)
http://git-wip-us.apache.org/repos/asf/hive/blob/94e8761a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/HistoryLogger.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/HistoryLogger.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/HistoryLogger.java
index 3c9ad24..f1fc285 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/HistoryLogger.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/HistoryLogger.java
@@ -27,6 +27,7 @@ public class HistoryLogger {
private static final String HISTORY_START_TIME = "StartTime";
private static final String HISTORY_END_TIME = "EndTime";
private static final String HISTORY_DAG_NAME = "DagName";
+ private static final String HISTORY_DAG_ID = "DagId";
private static final String HISTORY_VERTEX_NAME = "VertexName";
private static final String HISTORY_TASK_ID = "TaskId";
private static final String HISTORY_ATTEMPT_ID = "TaskAttemptId";
@@ -41,29 +42,30 @@ public class HistoryLogger {
public static void logFragmentStart(String applicationIdStr, String containerIdStr,
String hostname,
- String dagName, String vertexName, int taskId,
+ String dagName, int dagIdentifier, String vertexName, int taskId,
int attemptId) {
HISTORY_LOGGER.info(
- constructFragmentStartString(applicationIdStr, containerIdStr, hostname, dagName,
+ constructFragmentStartString(applicationIdStr, containerIdStr, hostname, dagName, dagIdentifier,
vertexName, taskId, attemptId));
}
public static void logFragmentEnd(String applicationIdStr, String containerIdStr, String hostname,
- String dagName, String vertexName, int taskId, int attemptId,
+ String dagName, int dagIdentifier, String vertexName, int taskId, int attemptId,
String threadName, long startTime, boolean failed) {
HISTORY_LOGGER.info(constructFragmentEndString(applicationIdStr, containerIdStr, hostname,
- dagName, vertexName, taskId, attemptId, threadName, startTime, failed));
+ dagName, dagIdentifier, vertexName, taskId, attemptId, threadName, startTime, failed));
}
private static String constructFragmentStartString(String applicationIdStr, String containerIdStr,
- String hostname, String dagName,
+ String hostname, String dagName, int dagIdentifier,
String vertexName, int taskId, int attemptId) {
HistoryLineBuilder lb = new HistoryLineBuilder(EVENT_TYPE_FRAGMENT_START);
lb.addHostName(hostname);
lb.addAppid(applicationIdStr);
lb.addContainerId(containerIdStr);
lb.addDagName(dagName);
+ lb.addDagId(dagIdentifier);
lb.addVertexName(vertexName);
lb.addTaskId(taskId);
lb.addTaskAttemptId(attemptId);
@@ -72,7 +74,7 @@ public class HistoryLogger {
}
private static String constructFragmentEndString(String applicationIdStr, String containerIdStr,
- String hostname, String dagName,
+ String hostname, String dagName, int dagIdentifier,
String vertexName, int taskId, int attemptId,
String threadName, long startTime, boolean succeeded) {
HistoryLineBuilder lb = new HistoryLineBuilder(EVENT_TYPE_FRAGMENT_END);
@@ -80,6 +82,7 @@ public class HistoryLogger {
lb.addAppid(applicationIdStr);
lb.addContainerId(containerIdStr);
lb.addDagName(dagName);
+ lb.addDagId(dagIdentifier);
lb.addVertexName(vertexName);
lb.addTaskId(taskId);
lb.addTaskAttemptId(attemptId);
@@ -113,6 +116,10 @@ public class HistoryLogger {
return setKeyValue(HISTORY_DAG_NAME, dagName);
}
+ HistoryLineBuilder addDagId(int dagId) {
+ return setKeyValue(HISTORY_DAG_ID, String.valueOf(dagId));
+ }
+
HistoryLineBuilder addVertexName(String vertexName) {
return setKeyValue(HISTORY_VERTEX_NAME, vertexName);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/94e8761a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/KilledTaskHandler.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/KilledTaskHandler.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/KilledTaskHandler.java
index 7cb433b..e2caec2 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/KilledTaskHandler.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/KilledTaskHandler.java
@@ -14,6 +14,7 @@
package org.apache.hadoop.hive.llap.daemon;
+import org.apache.hadoop.hive.llap.daemon.impl.QueryIdentifier;
import org.apache.hadoop.security.token.Token;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -24,6 +25,6 @@ public interface KilledTaskHandler {
// inferred from this.
// Passing in parameters until there's some dag information stored and tracked in the daemon.
void taskKilled(String amLocation, int port, String user,
- Token<JobTokenIdentifier> jobToken, String queryId, String dagName,
+ Token<JobTokenIdentifier> jobToken, QueryIdentifier queryIdentifier,
TezTaskAttemptID taskAttemptId);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/94e8761a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/QueryFailedHandler.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/QueryFailedHandler.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/QueryFailedHandler.java
index 4e62a68..7f9553d 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/QueryFailedHandler.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/QueryFailedHandler.java
@@ -14,7 +14,9 @@
package org.apache.hadoop.hive.llap.daemon;
+import org.apache.hadoop.hive.llap.daemon.impl.QueryIdentifier;
+
public interface QueryFailedHandler {
- public void queryFailed(String queryId, String dagName);
+ public void queryFailed(QueryIdentifier queryIdentifier);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/94e8761a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
index f6711d8..d1ec715 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
@@ -172,9 +172,9 @@ public class AMReporter extends AbstractService {
}
public void registerTask(String amLocation, int port, String user,
- Token<JobTokenIdentifier> jobToken, String queryId, String dagName) {
+ Token<JobTokenIdentifier> jobToken, QueryIdentifier queryIdentifier) {
if (LOG.isTraceEnabled()) {
- LOG.trace("Registering for heartbeat: " + amLocation + ":" + port + " for dagName=" + dagName);
+ LOG.trace("Registering for heartbeat: " + amLocation + ":" + port + " for queryIdentifier=" + queryIdentifier);
}
AMNodeInfo amNodeInfo;
synchronized (knownAppMasters) {
@@ -182,7 +182,7 @@ public class AMReporter extends AbstractService {
amNodeInfo = knownAppMasters.get(amNodeId);
if (amNodeInfo == null) {
amNodeInfo =
- new AMNodeInfo(amNodeId, user, jobToken, dagName, retryPolicy, retryTimeout, socketFactory,
+ new AMNodeInfo(amNodeId, user, jobToken, queryIdentifier, retryPolicy, retryTimeout, socketFactory,
conf);
knownAppMasters.put(amNodeId, amNodeInfo);
// Add to the queue only the first time this is registered, and on
@@ -190,7 +190,7 @@ public class AMReporter extends AbstractService {
amNodeInfo.setNextHeartbeatTime(System.currentTimeMillis() + heartbeatInterval);
pendingHeartbeatQueeu.add(amNodeInfo);
}
- amNodeInfo.setCurrentDagName(dagName);
+ amNodeInfo.setCurrentQueryIdentifier(queryIdentifier);
amNodeInfo.incrementAndGetTaskCount();
}
}
@@ -214,12 +214,12 @@ public class AMReporter extends AbstractService {
}
public void taskKilled(String amLocation, int port, String user, Token<JobTokenIdentifier> jobToken,
- final String queryId, final String dagName, final TezTaskAttemptID taskAttemptId) {
+ final QueryIdentifier queryIdentifier, final TezTaskAttemptID taskAttemptId) {
// Not re-using the connection for the AM heartbeat - which may or may not be open by this point.
// knownAppMasters is used for sending heartbeats for queued tasks. Killed messages use a new connection.
LlapNodeId amNodeId = LlapNodeId.getInstance(amLocation, port);
AMNodeInfo amNodeInfo =
- new AMNodeInfo(amNodeId, user, jobToken, dagName, retryPolicy, retryTimeout, socketFactory,
+ new AMNodeInfo(amNodeId, user, jobToken, queryIdentifier, retryPolicy, retryTimeout, socketFactory,
conf);
// Even if the service hasn't started up. It's OK to make this invocation since this will
@@ -251,8 +251,8 @@ public class AMReporter extends AbstractService {
synchronized (knownAppMasters) {
if (LOG.isDebugEnabled()) {
LOG.debug(
- "Removing am {} with last associated dag{} from heartbeat with taskCount={}, amFailed={}",
- amNodeInfo.amNodeId, amNodeInfo.getCurrentDagName(), amNodeInfo.getTaskCount(),
+ "Removing am {} with last associated dag {} from heartbeat with taskCount={}, amFailed={}",
+ amNodeInfo.amNodeId, amNodeInfo.getCurrentQueryIdentifier(), amNodeInfo.getTaskCount(),
amNodeInfo.hasAmFailed(), amNodeInfo);
}
knownAppMasters.remove(amNodeInfo.amNodeId);
@@ -272,11 +272,11 @@ public class AMReporter extends AbstractService {
@Override
public void onFailure(Throwable t) {
- String currentDagName = amNodeInfo.getCurrentDagName();
+ QueryIdentifier currentQueryIdentifier = amNodeInfo.getCurrentQueryIdentifier();
amNodeInfo.setAmFailed(true);
LOG.warn("Heartbeat failed to AM {}. Killing all other tasks for the query={}",
- amNodeInfo.amNodeId, currentDagName, t);
- queryFailedHandler.queryFailed(null, currentDagName);
+ amNodeInfo.amNodeId, currentQueryIdentifier, t);
+ queryFailedHandler.queryFailed(currentQueryIdentifier);
}
});
}
@@ -339,11 +339,11 @@ public class AMReporter extends AbstractService {
amNodeInfo.getUmbilical().nodeHeartbeat(new Text(nodeId.getHostname()),
nodeId.getPort());
} catch (IOException e) {
- String currentDagName = amNodeInfo.getCurrentDagName();
+ QueryIdentifier currentQueryIdentifier = amNodeInfo.getCurrentQueryIdentifier();
amNodeInfo.setAmFailed(true);
LOG.warn("Failed to communicated with AM at {}. Killing remaining fragments for query {}",
- amNodeInfo.amNodeId, currentDagName, e);
- queryFailedHandler.queryFailed(null, currentDagName);
+ amNodeInfo.amNodeId, currentQueryIdentifier, e);
+ queryFailedHandler.queryFailed(currentQueryIdentifier);
} catch (InterruptedException e) {
if (!isShutdown.get()) {
LOG.warn("Interrupted while trying to send heartbeat to AM {}", amNodeInfo.amNodeId, e);
@@ -370,21 +370,21 @@ public class AMReporter extends AbstractService {
private final long timeout;
private final SocketFactory socketFactory;
private final AtomicBoolean amFailed = new AtomicBoolean(false);
- private String currentDagName;
+ private QueryIdentifier currentQueryIdentifier;
private LlapTaskUmbilicalProtocol umbilical;
private long nextHeartbeatTime;
public AMNodeInfo(LlapNodeId amNodeId, String user,
Token<JobTokenIdentifier> jobToken,
- String currentDagName,
+ QueryIdentifier currentQueryIdentifier,
RetryPolicy retryPolicy,
long timeout,
SocketFactory socketFactory,
Configuration conf) {
this.user = user;
this.jobToken = jobToken;
- this.currentDagName = currentDagName;
+ this.currentQueryIdentifier = currentQueryIdentifier;
this.retryPolicy = retryPolicy;
this.timeout = timeout;
this.socketFactory = socketFactory;
@@ -439,12 +439,12 @@ public class AMReporter extends AbstractService {
return taskCount.get();
}
- public synchronized String getCurrentDagName() {
- return currentDagName;
+ public synchronized QueryIdentifier getCurrentQueryIdentifier() {
+ return currentQueryIdentifier;
}
- public synchronized void setCurrentDagName(String currentDagName) {
- this.currentDagName = currentDagName;
+ public synchronized void setCurrentQueryIdentifier(QueryIdentifier queryIdentifier) {
+ this.currentQueryIdentifier = queryIdentifier;
}
synchronized void setNextHeartbeatTime(long nextTime) {
http://git-wip-us.apache.org/repos/asf/hive/blob/94e8761a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index 0d85671..b0bf844 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -149,7 +149,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
@Override
public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws IOException {
HistoryLogger.logFragmentStart(request.getApplicationIdString(), request.getContainerIdString(),
- localAddress.get().getHostName(), request.getFragmentSpec().getDagName(),
+ localAddress.get().getHostName(), request.getFragmentSpec().getDagName(), request.getFragmentSpec().getDagId(),
request.getFragmentSpec().getVertexName(), request.getFragmentSpec().getFragmentNumber(),
request.getFragmentSpec().getAttemptNumber());
if (LOG.isInfoEnabled()) {
@@ -172,8 +172,10 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
fragmentSpec.getFragmentIdentifierString());
int dagIdentifier = taskAttemptId.getTaskID().getVertexID().getDAGId().getId();
+ QueryIdentifier queryIdentifier = new QueryIdentifier(request.getApplicationIdString(), dagIdentifier);
+
QueryFragmentInfo fragmentInfo = queryTracker
- .registerFragment(null, request.getApplicationIdString(), fragmentSpec.getDagName(),
+ .registerFragment(queryIdentifier, request.getApplicationIdString(), fragmentSpec.getDagName(),
dagIdentifier,
fragmentSpec.getVertexName(), fragmentSpec.getFragmentNumber(),
fragmentSpec.getAttemptNumber(), request.getUser(), request.getFragmentSpec());
@@ -239,28 +241,37 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
@Override
public void initializeHook(TezProcessor source) {
- queryTracker.registerDagQueryId(source.getContext().getDAGName(),
+ queryTracker.registerDagQueryId(
+ new QueryIdentifier(source.getContext().getApplicationId().toString(),
+ source.getContext().getDagIdentifier()),
HiveConf.getVar(source.getConf(), HiveConf.ConfVars.HIVEQUERYID));
}
}
@Override
- public SourceStateUpdatedResponseProto sourceStateUpdated(SourceStateUpdatedRequestProto request) {
+ public SourceStateUpdatedResponseProto sourceStateUpdated(
+ SourceStateUpdatedRequestProto request) {
LOG.info("Processing state update: " + stringifySourceStateUpdateRequest(request));
- queryTracker.registerSourceStateChange(request.getDagName(), request.getSrcName(),
+ queryTracker.registerSourceStateChange(
+ new QueryIdentifier(request.getQueryIdentifier().getAppIdentifier(),
+ request.getQueryIdentifier().getDagIdentifier()), request.getSrcName(),
request.getState());
return SourceStateUpdatedResponseProto.getDefaultInstance();
}
@Override
public QueryCompleteResponseProto queryComplete(QueryCompleteRequestProto request) {
- LOG.info("Processing queryComplete notification for {}", request.getDagName());
+ QueryIdentifier queryIdentifier =
+ new QueryIdentifier(request.getQueryIdentifier().getAppIdentifier(),
+ request.getQueryIdentifier().getDagIdentifier());
+ LOG.info("Processing queryComplete notification for {}", queryIdentifier);
List<QueryFragmentInfo> knownFragments =
- queryTracker.queryComplete(null, request.getDagName(), request.getDeleteDelay());
- LOG.info("DBG: Pending fragment count for completed query {} = {}", request.getDagName(),
+ queryTracker
+ .queryComplete(queryIdentifier, request.getDeleteDelay());
+ LOG.info("DBG: Pending fragment count for completed query {} = {}", queryIdentifier,
knownFragments.size());
for (QueryFragmentInfo fragmentInfo : knownFragments) {
- LOG.info("DBG: Issuing killFragment for completed query {} {}", request.getDagName(),
+ LOG.info("DBG: Issuing killFragment for completed query {} {}", queryIdentifier,
fragmentInfo.getFragmentIdentifierString());
executorService.killFragment(fragmentInfo.getFragmentIdentifierString());
}
@@ -276,7 +287,9 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
private String stringifySourceStateUpdateRequest(SourceStateUpdatedRequestProto request) {
StringBuilder sb = new StringBuilder();
- sb.append("dagName=").append(request.getDagName())
+ QueryIdentifier queryIdentifier = new QueryIdentifier(request.getQueryIdentifier().getAppIdentifier(),
+ request.getQueryIdentifier().getDagIdentifier());
+ sb.append("queryIdentifier=").append(queryIdentifier)
.append(", ").append("sourceName=").append(request.getSrcName())
.append(", ").append("state=").append(request.getState());
return sb.toString();
@@ -342,14 +355,14 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
}
@Override
- public void queryFailed(String queryId, String dagName) {
- LOG.info("Processing query failed notification for {}", dagName);
+ public void queryFailed(QueryIdentifier queryIdentifier) {
+ LOG.info("Processing query failed notification for {}", queryIdentifier);
List<QueryFragmentInfo> knownFragments =
- queryTracker.queryComplete(queryId, dagName, -1);
- LOG.info("DBG: Pending fragment count for failed query {} = {}", dagName,
+ queryTracker.queryComplete(queryIdentifier, -1);
+ LOG.info("DBG: Pending fragment count for failed query {} = {}", queryIdentifier,
knownFragments.size());
for (QueryFragmentInfo fragmentInfo : knownFragments) {
- LOG.info("DBG: Issuing killFragment for failed query {} {}", dagName,
+ LOG.info("DBG: Issuing killFragment for failed query {} {}", queryIdentifier,
fragmentInfo.getFragmentIdentifierString());
executorService.killFragment(fragmentInfo.getFragmentIdentifierString());
}
@@ -359,9 +372,9 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
@Override
public void taskKilled(String amLocation, int port, String user,
- Token<JobTokenIdentifier> jobToken, String queryId, String dagName,
+ Token<JobTokenIdentifier> jobToken, QueryIdentifier queryIdentifier,
TezTaskAttemptID taskAttemptId) {
- amReporter.taskKilled(amLocation, port, user, jobToken, queryId, dagName, taskAttemptId);
+ amReporter.taskKilled(amLocation, port, user, jobToken, queryIdentifier, taskAttemptId);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/94e8761a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index 4f1299d..98951e1 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -435,8 +435,8 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
private class QueryFailedHandlerProxy implements QueryFailedHandler {
@Override
- public void queryFailed(String queryId, String dagName) {
- containerRunner.queryFailed(queryId, dagName);
+ public void queryFailed(QueryIdentifier queryIdentifier) {
+ containerRunner.queryFailed(queryIdentifier);
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/94e8761a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryIdentifier.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryIdentifier.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryIdentifier.java
new file mode 100644
index 0000000..96e77e4
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryIdentifier.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.daemon.impl;
+
+/**
+ * An identifier for a query, which is unique.
+ */
+public final class QueryIdentifier {
+
+ private final String appIdentifier;
+ private final int dagIdentifier;
+
+
+ public QueryIdentifier(String appIdentifier, int dagIdentifier) {
+ this.appIdentifier = appIdentifier;
+ this.dagIdentifier = dagIdentifier;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || !getClass().isAssignableFrom(o.getClass())) {
+ return false;
+ }
+
+ QueryIdentifier that = (QueryIdentifier) o;
+
+ if (dagIdentifier != that.dagIdentifier) {
+ return false;
+ }
+ return appIdentifier.equals(that.appIdentifier);
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = appIdentifier.hashCode();
+ result = 31 * result + dagIdentifier;
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "QueryIdentifier{" +
+ "appIdentifier='" + appIdentifier + '\'' +
+ ", dagIdentifier=" + dagIdentifier +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/94e8761a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
index 27f2d4c..8bec95f 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
@@ -37,7 +37,7 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentS
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto;
public class QueryInfo {
- private final String queryId;
+ private final QueryIdentifier queryIdentifier;
private final String appIdString;
private final String dagName;
private final int dagIdentifier;
@@ -54,10 +54,10 @@ public class QueryInfo {
private final FinishableStateTracker finishableStateTracker = new FinishableStateTracker();
- public QueryInfo(String queryId, String appIdString, String dagName, int dagIdentifier,
+ public QueryInfo(QueryIdentifier queryIdentifier, String appIdString, String dagName, int dagIdentifier,
String user, ConcurrentMap<String, SourceStateProto> sourceStateMap,
String[] localDirsBase, FileSystem localFs) {
- this.queryId = queryId;
+ this.queryIdentifier = queryIdentifier;
this.appIdString = appIdString;
this.dagName = dagName;
this.dagIdentifier = dagIdentifier;
@@ -67,18 +67,14 @@ public class QueryInfo {
this.localFs = localFs;
}
- public String getQueryId() {
- return queryId;
+ public QueryIdentifier getQueryIdentifier() {
+ return queryIdentifier;
}
public String getAppIdString() {
return appIdString;
}
- public String getDagName() {
- return dagName;
- }
-
public int getDagIdentifier() {
return dagIdentifier;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/94e8761a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
index 6deaefc..0676edd 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
@@ -56,8 +56,7 @@ public class QueryTracker extends AbstractService {
private final ScheduledExecutorService executorService;
- // TODO Make use if the query id for cachin when this is available.
- private final ConcurrentHashMap<String, QueryInfo> queryInfoMap = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<QueryIdentifier, QueryInfo> queryInfoMap = new ConcurrentHashMap<>();
private final String[] localDirsBase;
private final FileSystem localFs;
@@ -70,22 +69,25 @@ public class QueryTracker extends AbstractService {
// Alternately - send in an explicit dag start message before any other message is processed.
// Multiple threads communicating from a single AM gets in the way of this.
- // Keeps track of completed dags. Assumes dag names are unique across AMs.
- private final Set<String> completedDagMap = Collections.newSetFromMap(
- new ConcurrentHashMap<String, Boolean>());
+ // Keeps track of completed DAGS. QueryIdentifiers need to be unique across applications.
+ private final Set<QueryIdentifier> completedDagMap =
+ Collections.newSetFromMap(new ConcurrentHashMap<QueryIdentifier, Boolean>());
private final Lock lock = new ReentrantLock();
- private final ConcurrentMap<String, ReadWriteLock> dagSpecificLocks = new ConcurrentHashMap<>();
+ private final ConcurrentMap<QueryIdentifier, ReadWriteLock> dagSpecificLocks = new ConcurrentHashMap<>();
// Tracks various maps for dagCompletions. This is setup here since stateChange messages
// may be processed by a thread which ends up executing before a task.
- private final ConcurrentMap<String, ConcurrentMap<String, SourceStateProto>>
- sourceCompletionMap = new ConcurrentHashMap<>();
+ private final ConcurrentMap<QueryIdentifier, ConcurrentMap<String, SourceStateProto>>
+ sourceCompletionMap = new ConcurrentHashMap<>();
- // Tracks queryId by dagName. This can only be set when config is parsed in TezProcessor,
+ // Tracks HiveQueryId by QueryIdentifier. This can only be set when config is parsed in TezProcessor.
// all the other existing code passes queryId equal to 0 everywhere.
- private final ConcurrentHashMap<String, String> dagNameToQueryId = new ConcurrentHashMap<>();
+ // If we switch the runtime and move to parsing the payload in the AM - the actual hive queryId could
+ // be sent over the wire from the AM, and will take the place of AppId+dagId in QueryIdentifier.
+ private final ConcurrentHashMap<QueryIdentifier, String> queryIdentifierToHiveQueryId =
+ new ConcurrentHashMap<>();
public QueryTracker(Configuration conf, String[] localDirsBase) {
super("QueryTracker");
@@ -107,7 +109,7 @@ public class QueryTracker extends AbstractService {
/**
* Register a new fragment for a specific query
- * @param queryId
+ * @param queryIdentifier
* @param appIdString
* @param dagName
* @param dagIdentifier
@@ -117,23 +119,23 @@ public class QueryTracker extends AbstractService {
* @param user
* @throws IOException
*/
- QueryFragmentInfo registerFragment(String queryId, String appIdString, String dagName,
+ QueryFragmentInfo registerFragment(QueryIdentifier queryIdentifier, String appIdString, String dagName,
int dagIdentifier, String vertexName, int fragmentNumber, int attemptNumber, String user,
FragmentSpecProto fragmentSpec) throws IOException {
- ReadWriteLock dagLock = getDagLock(dagName);
+ ReadWriteLock dagLock = getDagLock(queryIdentifier);
dagLock.readLock().lock();
try {
- if (!completedDagMap.contains(dagName)) {
- QueryInfo queryInfo = queryInfoMap.get(dagName);
+ if (!completedDagMap.contains(queryIdentifier)) {
+ QueryInfo queryInfo = queryInfoMap.get(queryIdentifier);
if (queryInfo == null) {
- queryInfo = new QueryInfo(queryId, appIdString, dagName, dagIdentifier, user,
- getSourceCompletionMap(dagName), localDirsBase, localFs);
- queryInfoMap.putIfAbsent(dagName, queryInfo);
+ queryInfo = new QueryInfo(queryIdentifier, appIdString, dagName, dagIdentifier, user,
+ getSourceCompletionMap(queryIdentifier), localDirsBase, localFs);
+ queryInfoMap.putIfAbsent(queryIdentifier, queryInfo);
}
return queryInfo.registerFragment(vertexName, fragmentNumber, attemptNumber, fragmentSpec);
} else {
// Cleanup the dag lock here, since it may have been created after the query completed
- dagSpecificLocks.remove(dagName);
+ dagSpecificLocks.remove(queryIdentifier);
throw new RuntimeException(
"Dag " + dagName + " already complete. Rejecting fragment ["
+ vertexName + ", " + fragmentNumber + ", " + attemptNumber + "]");
@@ -148,12 +150,12 @@ public class QueryTracker extends AbstractService {
* @param fragmentInfo
*/
void fragmentComplete(QueryFragmentInfo fragmentInfo) {
- String dagName = fragmentInfo.getQueryInfo().getDagName();
- QueryInfo queryInfo = queryInfoMap.get(dagName);
+ QueryIdentifier qId = fragmentInfo.getQueryInfo().getQueryIdentifier();
+ QueryInfo queryInfo = queryInfoMap.get(qId);
if (queryInfo == null) {
// Possible because a queryComplete message from the AM can come in first - KILL / SUCCESSFUL,
// before the fragmentComplete is reported
- LOG.info("Ignoring fragmentComplete message for unknown query");
+ LOG.info("Ignoring fragmentComplete message for unknown query: {}", qId);
} else {
queryInfo.unregisterFragment(fragmentInfo);
}
@@ -161,42 +163,40 @@ public class QueryTracker extends AbstractService {
/**
* Register completion for a query
- * @param queryId
- * @param dagName
+ * @param queryIdentifier
* @param deleteDelay
*/
- List<QueryFragmentInfo> queryComplete(String queryId, String dagName, long deleteDelay) {
+ List<QueryFragmentInfo> queryComplete(QueryIdentifier queryIdentifier, long deleteDelay) {
if (deleteDelay == -1) {
deleteDelay = defaultDeleteDelaySeconds;
}
- ReadWriteLock dagLock = getDagLock(dagName);
+ ReadWriteLock dagLock = getDagLock(queryIdentifier);
dagLock.writeLock().lock();
try {
- rememberCompletedDag(dagName);
- LOG.info("Processing queryComplete for dagName={} with deleteDelay={} seconds",
- dagName, deleteDelay);
- QueryInfo queryInfo = queryInfoMap.remove(dagName);
+ rememberCompletedDag(queryIdentifier);
+ LOG.info("Processing queryComplete for queryIdentifier={} with deleteDelay={} seconds", queryIdentifier,
+ deleteDelay);
+ QueryInfo queryInfo = queryInfoMap.remove(queryIdentifier);
if (queryInfo == null) {
- LOG.warn("Ignoring query complete for unknown dag: {}", dagName);
+ LOG.warn("Ignoring query complete for unknown dag: {}", queryIdentifier);
return Collections.emptyList();
}
String[] localDirs = queryInfo.getLocalDirsNoCreate();
if (localDirs != null) {
for (String localDir : localDirs) {
cleanupDir(localDir, deleteDelay);
- ShuffleHandler.get().unregisterDag(localDir, dagName, queryInfo.getDagIdentifier());
+ ShuffleHandler.get().unregisterDag(localDir, queryInfo.getAppIdString(), queryInfo.getDagIdentifier());
}
}
// Clearing this before sending a kill is OK, since canFinish will change to false.
// Ideally this should be a state machine where kills are issued to the executor,
// and the structures are cleaned up once all tasks complete. New requests, however,
// should not be allowed after a query complete is received.
- sourceCompletionMap.remove(dagName);
- String savedQueryId = dagNameToQueryId.remove(dagName);
- queryId = queryId == null ? savedQueryId : queryId;
- dagSpecificLocks.remove(dagName);
- if (queryId != null) {
- ObjectCacheFactory.removeLlapQueryCache(queryId);
+ sourceCompletionMap.remove(queryIdentifier);
+ String savedQueryId = queryIdentifierToHiveQueryId.remove(queryIdentifier);
+ dagSpecificLocks.remove(queryIdentifier);
+ if (savedQueryId != null) {
+ ObjectCacheFactory.removeLlapQueryCache(savedQueryId);
}
return queryInfo.getRegisteredFragments();
} finally {
@@ -206,24 +206,24 @@ public class QueryTracker extends AbstractService {
- public void rememberCompletedDag(String dagName) {
- if (completedDagMap.add(dagName)) {
+ public void rememberCompletedDag(QueryIdentifier queryIdentifier) {
+ if (completedDagMap.add(queryIdentifier)) {
// We will remember completed DAG for an hour to avoid execution out-of-order fragments.
- executorService.schedule(new DagMapCleanerCallable(dagName), 1, TimeUnit.HOURS);
+ executorService.schedule(new DagMapCleanerCallable(queryIdentifier), 1, TimeUnit.HOURS);
} else {
- LOG.warn("Couldn't add {} to completed dag set", dagName);
+ LOG.warn("Couldn't add {} to completed dag set", queryIdentifier);
}
}
/**
* Register an update to a source within an executing dag
- * @param dagName
+ * @param queryIdentifier
* @param sourceName
* @param sourceState
*/
- void registerSourceStateChange(String dagName, String sourceName, SourceStateProto sourceState) {
- getSourceCompletionMap(dagName).put(sourceName, sourceState);
- QueryInfo queryInfo = queryInfoMap.get(dagName);
+ void registerSourceStateChange(QueryIdentifier queryIdentifier, String sourceName, SourceStateProto sourceState) {
+ getSourceCompletionMap(queryIdentifier).put(sourceName, sourceState);
+ QueryInfo queryInfo = queryInfoMap.get(queryIdentifier);
if (queryInfo != null) {
queryInfo.sourceStateUpdated(sourceName);
} else {
@@ -233,13 +233,13 @@ public class QueryTracker extends AbstractService {
}
- private ReadWriteLock getDagLock(String dagName) {
+ private ReadWriteLock getDagLock(QueryIdentifier queryIdentifier) {
lock.lock();
try {
- ReadWriteLock dagLock = dagSpecificLocks.get(dagName);
+ ReadWriteLock dagLock = dagSpecificLocks.get(queryIdentifier);
if (dagLock == null) {
dagLock = new ReentrantReadWriteLock();
- dagSpecificLocks.put(dagName, dagLock);
+ dagSpecificLocks.put(queryIdentifier, dagLock);
}
return dagLock;
} finally {
@@ -247,20 +247,20 @@ public class QueryTracker extends AbstractService {
}
}
- private ConcurrentMap<String, SourceStateProto> getSourceCompletionMap(String dagName) {
- ConcurrentMap<String, SourceStateProto> dagMap = sourceCompletionMap.get(dagName);
+ private ConcurrentMap<String, SourceStateProto> getSourceCompletionMap(QueryIdentifier queryIdentifier) {
+ ConcurrentMap<String, SourceStateProto> dagMap = sourceCompletionMap.get(queryIdentifier);
if (dagMap == null) {
dagMap = new ConcurrentHashMap<>();
ConcurrentMap<String, SourceStateProto> old =
- sourceCompletionMap.putIfAbsent(dagName, dagMap);
+ sourceCompletionMap.putIfAbsent(queryIdentifier, dagMap);
dagMap = (old != null) ? old : dagMap;
}
return dagMap;
}
- public void registerDagQueryId(String dagName, String queryId) {
- if (queryId == null) return;
- dagNameToQueryId.putIfAbsent(dagName, queryId);
+ public void registerDagQueryId(QueryIdentifier queryIdentifier, String hiveQueryIdString) {
+ if (hiveQueryIdString == null) return;
+ queryIdentifierToHiveQueryId.putIfAbsent(queryIdentifier, hiveQueryIdString);
}
@Override
@@ -302,15 +302,15 @@ public class QueryTracker extends AbstractService {
}
private class DagMapCleanerCallable extends CallableWithNdc<Void> {
- private final String dagName;
+ private final QueryIdentifier queryIdentifier;
- private DagMapCleanerCallable(String dagName) {
- this.dagName = dagName;
+ private DagMapCleanerCallable(QueryIdentifier queryIdentifier) {
+ this.queryIdentifier = queryIdentifier;
}
@Override
protected Void callInternal() {
- completedDagMap.remove(dagName);
+ completedDagMap.remove(queryIdentifier);
return null;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/94e8761a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index f03a2ff..b60f71f 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -131,7 +131,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
// Register with the AMReporter when the callable is setup. Unregister once it starts running.
if (jobToken != null) {
this.amReporter.registerTask(request.getAmHost(), request.getAmPort(),
- request.getUser(), jobToken, null, request.getFragmentSpec().getDagName());
+ request.getUser(), jobToken, fragmentInfo.getQueryInfo().getQueryIdentifier());
}
this.metrics = metrics;
this.requestId = request.getFragmentSpec().getFragmentIdentifierString();
@@ -297,9 +297,8 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
*/
public void reportTaskKilled() {
killedTaskHandler
- .taskKilled(request.getAmHost(), request.getAmPort(), request.getUser(), jobToken, null,
- taskSpec.getDAGName(),
- taskSpec.getTaskAttemptID());
+ .taskKilled(request.getAmHost(), request.getAmPort(), request.getUser(), jobToken,
+ fragmentInfo.getQueryInfo().getQueryIdentifier(), taskSpec.getTaskAttemptID());
}
public boolean canFinish() {
@@ -428,6 +427,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
HistoryLogger
.logFragmentEnd(request.getApplicationIdString(), request.getContainerIdString(),
executionContext.getHostName(), request.getFragmentSpec().getDagName(),
+ fragmentInfo.getQueryInfo().getDagIdentifier(),
request.getFragmentSpec().getVertexName(),
request.getFragmentSpec().getFragmentNumber(),
request.getFragmentSpec().getAttemptNumber(), taskRunnerCallable.threadName,
@@ -445,6 +445,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
HistoryLogger
.logFragmentEnd(request.getApplicationIdString(), request.getContainerIdString(),
executionContext.getHostName(), request.getFragmentSpec().getDagName(),
+ fragmentInfo.getQueryInfo().getDagIdentifier(),
request.getFragmentSpec().getVertexName(),
request.getFragmentSpec().getFragmentNumber(),
request.getFragmentSpec().getAttemptNumber(), taskRunnerCallable.threadName,
http://git-wip-us.apache.org/repos/asf/hive/blob/94e8761a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java
index 7428a6a..f61d62f 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java
@@ -262,5 +262,4 @@ public class Converters {
throw new RuntimeException("Unexpected state: " + state);
}
}
-
}
http://git-wip-us.apache.org/repos/asf/hive/blob/94e8761a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
index 5c370ee..ae7d291 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hive.llap.LlapNodeId;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
@@ -85,7 +86,8 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
private static final boolean isDebugEnabed = LOG.isDebugEnabled();
private final SubmitWorkRequestProto BASE_SUBMIT_WORK_REQUEST;
- private final ConcurrentMap<String, ByteBuffer> credentialMap;
+
+ private final ConcurrentMap<QueryIdentifierProto, ByteBuffer> credentialMap;
// Tracks containerIds and taskAttemptIds, so can be kept independent of the running DAG.
// When DAG specific cleanup happens, it'll be better to link this to a DAG though.
@@ -104,7 +106,8 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
private final ConcurrentMap<LlapNodeId, PingingNodeInfo> pingedNodeMap = new ConcurrentHashMap<>();
- private volatile String currentDagName;
+ private volatile int currentDagId;
+ private volatile QueryIdentifierProto currentQueryIdentifierProto;
public LlapTaskCommunicator(
TaskCommunicatorContext taskCommunicatorContext) {
@@ -226,8 +229,9 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
int priority) {
super.registerRunningTaskAttempt(containerId, taskSpec, additionalResources, credentials,
credentialsChanged, priority);
- if (taskSpec.getDAGName() != currentDagName) {
- resetCurrentDag(taskSpec.getDAGName());
+ int dagId = taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId();
+ if (currentQueryIdentifierProto == null || (dagId != currentQueryIdentifierProto.getDagIdentifier())) {
+ resetCurrentDag(dagId);
}
@@ -251,7 +255,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
nodesForQuery.add(nodeId);
sourceStateTracker.registerTaskForStateUpdates(host, port, taskSpec.getInputs());
- FragmentRuntimeInfo fragmentRuntimeInfo = sourceStateTracker.getFragmentRuntimeInfo(taskSpec.getDAGName(),
+ FragmentRuntimeInfo fragmentRuntimeInfo = sourceStateTracker.getFragmentRuntimeInfo(
taskSpec.getVertexName(), taskSpec.getTaskAttemptID().getTaskID().getId(), priority);
SubmitWorkRequestProto requestProto;
@@ -349,7 +353,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
// NodeId can be null if the task gets unregistered due to failure / being killed by the daemon itself
if (nodeId != null) {
TerminateFragmentRequestProto request =
- TerminateFragmentRequestProto.newBuilder().setDagName(currentDagName)
+ TerminateFragmentRequestProto.newBuilder().setQueryIdentifier(currentQueryIdentifierProto)
.setFragmentIdentifierString(taskAttemptId.toString()).build();
communicator.sendTerminateFragment(request, nodeId.getHostname(), nodeId.getPort(),
new LlapDaemonProtocolClientProxy.ExecuteRequestCallback<TerminateFragmentResponseProto>() {
@@ -370,12 +374,16 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
}
}
+
+
+
@Override
- public void dagComplete(final String dagName) {
- QueryCompleteRequestProto request = QueryCompleteRequestProto.newBuilder().setDagName(
- dagName).setDeleteDelay(deleteDelayOnDagComplete).build();
+ public void dagComplete(final int dagIdentifier) {
+ QueryCompleteRequestProto request = QueryCompleteRequestProto.newBuilder()
+ .setQueryIdentifier(constructQueryIdentifierProto(dagIdentifier))
+ .setDeleteDelay(deleteDelayOnDagComplete).build();
for (final LlapNodeId llapNodeId : nodesForQuery) {
- LOG.info("Sending dagComplete message for {}, to {}", dagName, llapNodeId);
+ LOG.info("Sending dagComplete message for {}, to {}", dagIdentifier, llapNodeId);
communicator.sendQueryComplete(request, llapNodeId.getHostname(), llapNodeId.getPort(),
new LlapDaemonProtocolClientProxy.ExecuteRequestCallback<LlapDaemonProtocolProtos.QueryCompleteResponseProto>() {
@Override
@@ -384,7 +392,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
@Override
public void indicateError(Throwable t) {
- LOG.warn("Failed to indicate dag complete dagId={} to node {}", dagName, llapNodeId);
+ LOG.warn("Failed to indicate dag complete dagId={} to node {}", dagIdentifier, llapNodeId);
}
});
}
@@ -495,12 +503,12 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
}
}
- private void resetCurrentDag(String newDagName) {
+ private void resetCurrentDag(int newDagId) {
// Working on the assumption that a single DAG runs at a time per AM.
- currentDagName = newDagName;
- sourceStateTracker.resetState(newDagName);
+ currentQueryIdentifierProto = constructQueryIdentifierProto(newDagId);
+ sourceStateTracker.resetState(newDagId);
nodesForQuery.clear();
- LOG.info("CurrentDag set to: " + newDagName);
+ LOG.info("CurrentDagId set to: " + newDagId + ", name=" + getContext().getCurrentDagName());
// TODO Is it possible for heartbeats to come in from lost tasks - those should be told to die, which
// is likely already happening.
}
@@ -518,10 +526,12 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
// Credentials can change across DAGs. Ideally construct only once per DAG.
taskCredentials.addAll(getContext().getCredentials());
- ByteBuffer credentialsBinary = credentialMap.get(taskSpec.getDAGName());
+ Preconditions.checkState(currentQueryIdentifierProto.getDagIdentifier() ==
+ taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId());
+ ByteBuffer credentialsBinary = credentialMap.get(currentQueryIdentifierProto);
if (credentialsBinary == null) {
credentialsBinary = serializeCredentials(getContext().getCredentials());
- credentialMap.putIfAbsent(taskSpec.getDAGName(), credentialsBinary.duplicate());
+ credentialMap.putIfAbsent(currentQueryIdentifierProto, credentialsBinary.duplicate());
} else {
credentialsBinary = credentialsBinary.duplicate();
}
@@ -736,4 +746,10 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
}
}
+
+ private QueryIdentifierProto constructQueryIdentifierProto(int dagIdentifier) {
+ return QueryIdentifierProto.newBuilder()
+ .setAppIdentifier(getContext().getCurrentAppIdentifier()).setDagIdentifier(dagIdentifier)
+ .build();
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/94e8761a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java
index 066fae5..628fe9c 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java
@@ -24,6 +24,8 @@ import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.hadoop.hive.llap.daemon.impl.QueryIdentifier;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.llap.LlapNodeId;
@@ -45,28 +47,33 @@ public class SourceStateTracker {
private final TaskCommunicatorContext taskCommunicatorContext;
private final LlapTaskCommunicator taskCommunicator;
+ private final QueryIdentifierProto BASE_QUERY_IDENTIFIER;
+
// Tracks vertices for which notifications have been registered
private final Set<String> notificationRegisteredVertices = new HashSet<>();
private final Map<String, SourceInfo> sourceInfoMap = new HashMap<>();
private final Map<LlapNodeId, NodeInfo> nodeInfoMap = new HashMap<>();
- private volatile String currentDagName;
+ private volatile QueryIdentifierProto currentQueryIdentifier;
public SourceStateTracker(TaskCommunicatorContext taskCommunicatorContext,
LlapTaskCommunicator taskCommunicator) {
this.taskCommunicatorContext = taskCommunicatorContext;
this.taskCommunicator = taskCommunicator;
+ BASE_QUERY_IDENTIFIER = QueryIdentifierProto.newBuilder()
+ .setAppIdentifier(taskCommunicatorContext.getCurrentAppIdentifier()).build();
}
/**
* To be invoked after each DAG completes.
*/
- public synchronized void resetState(String newDagName) {
+ public synchronized void resetState(int newDagId) {
sourceInfoMap.clear();
nodeInfoMap.clear();
notificationRegisteredVertices.clear();
- this.currentDagName = newDagName;
+ this.currentQueryIdentifier =
+ QueryIdentifierProto.newBuilder(BASE_QUERY_IDENTIFIER).setDagIdentifier(newDagId).build();
}
/**
@@ -139,16 +146,16 @@ public class SourceStateTracker {
}
+ // Assumes serialized DAGs within an AM, and a reset of structures after each DAG completes.
/**
* Constructs FragmentRuntimeInfo for scheduling within LLAP daemons.
* Also caches state based on state updates.
- * @param dagName
* @param vertexName
* @param fragmentNumber
* @param priority
* @return
*/
- public synchronized FragmentRuntimeInfo getFragmentRuntimeInfo(String dagName, String vertexName, int fragmentNumber,
+ public synchronized FragmentRuntimeInfo getFragmentRuntimeInfo(String vertexName, int fragmentNumber,
int priority) {
FragmentRuntimeInfo.Builder builder = FragmentRuntimeInfo.newBuilder();
maybeRegisterForVertexUpdates(vertexName);
@@ -282,9 +289,8 @@ public class SourceStateTracker {
void sendStateUpdateToNode(LlapNodeId nodeId, String sourceName, VertexState state) {
taskCommunicator.sendStateUpdate(nodeId.getHostname(), nodeId.getPort(),
- SourceStateUpdatedRequestProto.newBuilder().setDagName(currentDagName).setSrcName(
- sourceName)
- .setState(Converters.fromVertexState(state)).build());
+ SourceStateUpdatedRequestProto.newBuilder().setQueryIdentifier(currentQueryIdentifier)
+ .setSrcName(sourceName).setState(Converters.fromVertexState(state)).build());
}
http://git-wip-us.apache.org/repos/asf/hive/blob/94e8761a/llap-server/src/protobuf/LlapDaemonProtocol.proto
----------------------------------------------------------------------
diff --git a/llap-server/src/protobuf/LlapDaemonProtocol.proto b/llap-server/src/protobuf/LlapDaemonProtocol.proto
index a2d944f..944c96c 100644
--- a/llap-server/src/protobuf/LlapDaemonProtocol.proto
+++ b/llap-server/src/protobuf/LlapDaemonProtocol.proto
@@ -50,6 +50,7 @@ message GroupInputSpecProto {
message FragmentSpecProto {
optional string fragment_identifier_string = 1;
optional string dag_name = 2;
+ optional int32 dag_id = 11;
optional string vertex_name = 3;
optional EntityDescriptorProto processor_descriptor = 4;
repeated IOSpecProto input_specs = 5;
@@ -74,6 +75,11 @@ enum SourceStateProto {
S_RUNNING = 2;
}
+message QueryIdentifierProto {
+ optional string app_identifier = 1;
+ optional int32 dag_identifier = 2;
+}
+
message SubmitWorkRequestProto {
optional string container_id_string = 1;
optional string am_host = 2;
@@ -98,7 +104,7 @@ message SubmitWorkResponseProto {
}
message SourceStateUpdatedRequestProto {
- optional string dag_name = 1;
+ optional QueryIdentifierProto query_identifier = 1;
optional string src_name = 2;
optional SourceStateProto state = 3;
}
@@ -108,17 +114,16 @@ message SourceStateUpdatedResponseProto {
message QueryCompleteRequestProto {
optional string query_id = 1;
- optional string dag_name = 2;
- optional int64 delete_delay = 3 [default = 0];
+ optional QueryIdentifierProto query_identifier = 2;
+ optional int64 delete_delay = 4 [default = 0];
}
message QueryCompleteResponseProto {
}
message TerminateFragmentRequestProto {
- optional string query_id = 1;
- optional string dag_name = 2;
- optional string fragment_identifier_string = 7;
+ optional QueryIdentifierProto query_identifier = 1;
+ optional string fragment_identifier_string = 2;
}
message TerminateFragmentResponseProto {
http://git-wip-us.apache.org/repos/asf/hive/blob/94e8761a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
index 38af07e..ef49714 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
@@ -16,6 +16,7 @@ package org.apache.hadoop.hive.llap.daemon.impl;
import static org.mockito.Mockito.mock;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
@@ -25,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
import org.apache.hadoop.security.Credentials;
@@ -48,18 +50,36 @@ public class TaskExecutorTestHelpers {
SubmitWorkRequestProto
requestProto = createSubmitWorkRequestProto(fragmentNum, parallelism,
startTime);
- MockRequest mockRequest = new MockRequest(requestProto, canFinish, workTime);
+ QueryFragmentInfo queryFragmentInfo = createQueryFragmentInfo(requestProto.getFragmentSpec());
+ MockRequest mockRequest = new MockRequest(requestProto, queryFragmentInfo, canFinish, workTime);
return mockRequest;
}
public static TaskExecutorService.TaskWrapper createTaskWrapper(
SubmitWorkRequestProto request, boolean canFinish, int workTime) {
- MockRequest mockRequest = new MockRequest(request, canFinish, workTime);
+ QueryFragmentInfo queryFragmentInfo = createQueryFragmentInfo(request.getFragmentSpec());
+ MockRequest mockRequest = new MockRequest(request, queryFragmentInfo, canFinish, workTime);
TaskExecutorService.TaskWrapper
taskWrapper = new TaskExecutorService.TaskWrapper(mockRequest, null);
return taskWrapper;
}
+ public static QueryFragmentInfo createQueryFragmentInfo(FragmentSpecProto fragmentSpecProto) {
+ QueryInfo queryInfo = createQueryInfo();
+ QueryFragmentInfo fragmentInfo =
+ new QueryFragmentInfo(queryInfo, "fakeVertexName", fragmentSpecProto.getFragmentNumber(), 0,
+ fragmentSpecProto);
+ return fragmentInfo;
+ }
+
+ public static QueryInfo createQueryInfo() {
+ QueryIdentifier queryIdentifier = new QueryIdentifier("fake_app_id_string", 1);
+ QueryInfo queryInfo =
+ new QueryInfo(queryIdentifier, "fake_app_id_string", "fake_dag_name", 1, "fakeUser",
+ new ConcurrentHashMap<String, LlapDaemonProtocolProtos.SourceStateProto>(),
+ new String[0], null);
+ return queryInfo;
+ }
public static SubmitWorkRequestProto createSubmitWorkRequestProto(
int fragmentNumber, int selfAndUpstreamParallelism,
@@ -80,7 +100,7 @@ public class TaskExecutorTestHelpers {
return SubmitWorkRequestProto
.newBuilder()
.setFragmentSpec(
- LlapDaemonProtocolProtos.FragmentSpecProto
+ FragmentSpecProto
.newBuilder()
.setAttemptNumber(0)
.setDagName("MockDag")
@@ -119,9 +139,9 @@ public class TaskExecutorTestHelpers {
private boolean shouldSleep = true;
private final Condition finishedCondition = lock.newCondition();
- public MockRequest(SubmitWorkRequestProto requestProto,
+ public MockRequest(SubmitWorkRequestProto requestProto, QueryFragmentInfo fragmentInfo,
boolean canFinish, long workTime) {
- super(requestProto, mock(QueryFragmentInfo.class), new Configuration(),
+ super(requestProto, fragmentInfo, new Configuration(),
new ExecutionContextImpl("localhost"), null, new Credentials(), 0, null, null, mock(
LlapDaemonExecutorMetrics.class),
mock(KilledTaskHandler.class), mock(
http://git-wip-us.apache.org/repos/asf/hive/blob/94e8761a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestQueryIdentifier.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestQueryIdentifier.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestQueryIdentifier.java
new file mode 100644
index 0000000..39a3865
--- /dev/null
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestQueryIdentifier.java
@@ -0,0 +1,48 @@
+package org.apache.hadoop.hive.llap.daemon.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+import org.junit.Test;
+
+public class TestQueryIdentifier {
+
+ @Test (timeout = 5000)
+ public void testEquality() {
+
+ String appIdString1 = "app1";
+ String appIdString2 = "app2";
+
+ int dagId1 = 1;
+ int dagId2 = 2;
+
+ QueryIdentifier[] queryIdentifiers = new QueryIdentifier[4];
+
+ queryIdentifiers[0] = new QueryIdentifier(appIdString1, dagId1);
+ queryIdentifiers[1] = new QueryIdentifier(appIdString1, dagId2);
+ queryIdentifiers[2] = new QueryIdentifier(appIdString2, dagId1);
+ queryIdentifiers[3] = new QueryIdentifier(appIdString2, dagId2);
+
+ for (int i = 0 ; i < 4 ; i++) {
+ for (int j = 0 ; j < 4 ; j++) {
+ if (i == j) {
+ assertEquals(queryIdentifiers[i], queryIdentifiers[j]);
+ } else {
+ assertNotEquals(queryIdentifiers[i], queryIdentifiers[j]);
+ }
+ }
+ }
+
+ QueryIdentifier q11 = new QueryIdentifier(appIdString1, dagId1);
+ QueryIdentifier q12 = new QueryIdentifier(appIdString1, dagId2);
+ QueryIdentifier q21 = new QueryIdentifier(appIdString2, dagId1);
+ QueryIdentifier q22 = new QueryIdentifier(appIdString2, dagId2);
+
+ assertEquals(queryIdentifiers[0], q11);
+ assertEquals(queryIdentifiers[1], q12);
+ assertEquals(queryIdentifiers[2], q21);
+ assertEquals(queryIdentifiers[3], q22);
+
+
+ }
+}