You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/03/29 02:15:08 UTC
git commit: TEZ-989. Allow addition of resources to the AM for a
running session. (sseth)
Repository: incubator-tez
Updated Branches:
refs/heads/master f34c7f320 -> 34a557d13
TEZ-989. Allow addition of resources to the AM for a running session.
(sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/34a557d1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/34a557d1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/34a557d1
Branch: refs/heads/master
Commit: 34a557d1318aee9b23f5ac7ded9bccfd72508de1
Parents: f34c7f3
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Mar 28 18:14:51 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Mar 28 18:14:51 2014 -0700
----------------------------------------------------------------------
.../org/apache/tez/client/TezClientUtils.java | 13 +-
.../java/org/apache/tez/client/TezSession.java | 57 ++++++-
.../src/main/proto/DAGClientAMProtocol.proto | 1 +
.../org/apache/tez/common/RuntimeUtils.java | 117 +++++++++++++++
.../apache/hadoop/mapred/YarnTezDagChild.java | 49 ++-----
...DAGClientAMProtocolBlockingPBServerImpl.java | 13 +-
.../org/apache/tez/dag/app/DAGAppMaster.java | 147 ++++++++++++++++---
.../org/apache/tez/dag/app/RecoveryParser.java | 7 +-
.../dag/app/dag/RootInputInitializerRunner.java | 8 +-
.../dag/app/dag/event/DAGEventRecoverEvent.java | 21 ++-
.../tez/dag/app/dag/event/DAGEventStartDag.java | 38 +++++
.../apache/tez/dag/app/dag/impl/DAGImpl.java | 20 ++-
.../org/apache/tez/dag/app/dag/impl/Edge.java | 2 +-
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 2 +-
.../tez/dag/app/dag/impl/VertexManager.java | 2 +-
.../dag/history/events/DAGSubmittedEvent.java | 29 +++-
.../tez/dag/utils/RelocalizationUtils.java | 68 +++++++++
tez-dag/src/main/proto/HistoryEvents.proto | 3 +-
.../tez/dag/app/dag/impl/TestDAGImpl.java | 5 +-
.../TestHistoryEventsProtoConversion.java | 4 +-
.../split/TezGroupedSplitsInputFormat.java | 11 +-
.../split/TezGroupedSplitsInputFormat.java | 7 +-
.../runtime/LogicalIOProcessorRuntimeTask.java | 1 +
.../org/apache/tez/runtime/RuntimeUtils.java | 116 ---------------
.../apache/tez/runtime/TestRuntimeUtils.java | 1 +
.../vertexmanager/TestShuffleVertexManager.java | 2 +-
.../apache/tez/mapreduce/TestMRRJobsDAGApi.java | 131 ++++++++++++++++-
.../java/org/apache/tez/test/TestTezJobs.java | 1 -
tez-tests/src/test/resources/test_jar | Bin 0 -> 1101 bytes
29 files changed, 649 insertions(+), 227 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index 8b87fb0..2ec1939 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -34,7 +34,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
-import java.util.UUID;
import java.util.Vector;
import java.util.Map.Entry;
@@ -55,7 +54,6 @@ import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
@@ -480,7 +478,16 @@ public class TezClientUtils {
DagTypeConverters.convertFromLocalResources(sessionJars);
sessionJarsPBOutStream = FileSystem.create(fs, sessionJarsPath,
new FsPermission(TEZ_AM_FILE_PERMISSION));
- proto.writeTo(sessionJarsPBOutStream);
+ proto.writeDelimitedTo(sessionJarsPBOutStream);
+
+ // Write out the initial list of resources which will be available in the AM
+ DAGProtos.PlanLocalResourcesProto amResourceProto;
+ if (amConfig.getLocalResources() != null && !amConfig.getLocalResources().isEmpty()) {
+ amResourceProto = DagTypeConverters.convertFromLocalResources(localResources);
+ } else {
+ amResourceProto = DAGProtos.PlanLocalResourcesProto.getDefaultInstance();
+ }
+ amResourceProto.writeDelimitedTo(sessionJarsPBOutStream);
} finally {
if (sessionJarsPBOutStream != null) {
sessionJarsPBOutStream.close();
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-api/src/main/java/org/apache/tez/client/TezSession.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezSession.java b/tez-api/src/main/java/org/apache/tez/client/TezSession.java
index ff7f00a..f131005 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezSession.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezSession.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.tez.common.TezYARNUtils;
@@ -54,6 +55,7 @@ import org.apache.tez.dag.api.client.rpc.DAGClientRPCImpl;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import com.google.protobuf.ServiceException;
public class TezSession {
@@ -137,14 +139,50 @@ public class TezSession {
* Submit a DAG to a Tez Session. Blocks until either the DAG is submitted to
* the session or configured timeout period expires. Cleans up session if the
* submission timed out.
+ *
+ * Recommended API.
+ *
* @param dag DAG to be submitted to Session
* @return DAGClient to monitor the DAG
* @throws TezException
* @throws IOException
* @throws SessionNotRunning if session is not alive
* @throws DAGSubmissionTimedOut if submission timed out
+ */
+ public synchronized DAGClient submitDAG(DAG dag) throws TezException, IOException,
+ InterruptedException {
+ return submitDAG(dag, null);
+ }
+
+ /**
+ * Submit a DAG to a Tez Session. Blocks until either the DAG is submitted to
+ * the session or configured timeout period expires. Cleans up session if the
+ * submission timed out.
+ *
+ *
+ *
+ * Allows specification of additional resources which will be added to the
+ * running AMs classpath for execution of this DAG.</p>
+ * Caveats: Resources stack across DAG submissions and are never removed from the classpath. Only
+ * LocalResourceType.FILE is supported. All resources will be treated as private.
+ *
+ * This method is not recommended unless their is no choice but to add resources to an existing session.
+ * Recommended usage is to setup AM local resources up front via AMConfiguration.
+ *
+ * @param dag
+ * DAG to be submitted to Session
+ * @param additionalAmResources
+ * additional resources which should be localized in the AM while
+ * executing this DAG.
+ * @return DAGClient to monitor the DAG
+ * @throws TezException
+ * @throws IOException
+ * @throws SessionNotRunning
+ * if session is not alive
+ * @throws DAGSubmissionTimedOut
+ * if submission timed out
*/
- public synchronized DAGClient submitDAG(DAG dag)
+ public synchronized DAGClient submitDAG(DAG dag, Map<String, LocalResource> additionalAmResources)
throws TezException, IOException, InterruptedException {
verifySessionStateForSubmission();
@@ -152,6 +190,13 @@ public class TezSession {
LOG.info("Submitting dag to TezSession"
+ ", sessionName=" + sessionName
+ ", applicationId=" + applicationId);
+
+ if (additionalAmResources != null && !additionalAmResources.isEmpty()) {
+ for (LocalResource lr : additionalAmResources.values()) {
+ Preconditions.checkArgument(lr.getType() == LocalResourceType.FILE, "LocalResourceType: "
+ + lr.getType() + " is not supported, only " + LocalResourceType.FILE + " is supported");
+ }
+ }
// Obtain DAG specific credentials.
TezClientUtils.setupDAGCredentials(dag, sessionCredentials, sessionConfig.getTezConfiguration());
@@ -167,8 +212,12 @@ public class TezSession {
}
DAGPlan dagPlan = dag.createDag(sessionConfig.getTezConfiguration());
- SubmitDAGRequestProto requestProto =
- SubmitDAGRequestProto.newBuilder().setDAGPlan(dagPlan).build();
+ SubmitDAGRequestProto.Builder requestBuilder = SubmitDAGRequestProto.newBuilder();
+ requestBuilder.setDAGPlan(dagPlan).build();
+ if (additionalAmResources != null && !additionalAmResources.isEmpty()) {
+ requestBuilder.setAdditionalAmResources(DagTypeConverters
+ .convertFromLocalResources(additionalAmResources));
+ }
DAGClientAMProtocolBlockingPB proxy = waitForProxy();
if (proxy == null) {
@@ -183,7 +232,7 @@ public class TezSession {
}
try {
- dagId = proxy.submitDAG(null, requestProto).getDagId();
+ dagId = proxy.submitDAG(null, requestBuilder.build()).getDagId();
} catch (ServiceException e) {
throw new TezException(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-api/src/main/proto/DAGClientAMProtocol.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/DAGClientAMProtocol.proto b/tez-api/src/main/proto/DAGClientAMProtocol.proto
index 525679e..8d33a19 100644
--- a/tez-api/src/main/proto/DAGClientAMProtocol.proto
+++ b/tez-api/src/main/proto/DAGClientAMProtocol.proto
@@ -61,6 +61,7 @@ message TryKillDAGResponseProto {
message SubmitDAGRequestProto {
optional DAGPlan d_a_g_plan = 1;
+ optional PlanLocalResourcesProto additional_am_resources = 2;
}
message SubmitDAGResponseProto {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-common/src/main/java/org/apache/tez/common/RuntimeUtils.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/RuntimeUtils.java b/tez-common/src/main/java/org/apache/tez/common/RuntimeUtils.java
new file mode 100644
index 0000000..c3d98f7
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/common/RuntimeUtils.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.TezUncheckedException;
+
+@Private
+public class RuntimeUtils {
+
+ private static final Map<String, Class<?>> CLAZZ_CACHE = new ConcurrentHashMap<String, Class<?>>();
+
+ @Private
+ public static Class<?> getClazz(String className) {
+ Class<?> clazz = CLAZZ_CACHE.get(className);
+ if (clazz == null) {
+ try {
+ clazz = Class.forName(className, true, Thread.currentThread().getContextClassLoader());
+ } catch (ClassNotFoundException e) {
+ throw new TezUncheckedException("Unable to load class: " + className, e);
+ }
+ }
+ return clazz;
+ }
+
+ private static <T> T getNewInstance(Class<T> clazz) {
+ T instance;
+ try {
+ instance = clazz.newInstance();
+ } catch (InstantiationException e) {
+ throw new TezUncheckedException(
+ "Unable to instantiate class with 0 arguments: " + clazz.getName(), e);
+ } catch (IllegalAccessException e) {
+ throw new TezUncheckedException(
+ "Unable to instantiate class with 0 arguments: " + clazz.getName(), e);
+ }
+ return instance;
+ }
+
+ @Private
+ public static <T> T createClazzInstance(String className) {
+ Class<?> clazz = getClazz(className);
+ @SuppressWarnings("unchecked")
+ T instance = (T) getNewInstance(clazz);
+ return instance;
+ }
+
+
+ @Private
+ public static synchronized void addResourcesToClasspath(List<URL> urls) {
+ ClassLoader classLoader = new URLClassLoader(urls.toArray(new URL[urls.size()]), Thread
+ .currentThread().getContextClassLoader());
+ Thread.currentThread().setContextClassLoader(classLoader);
+ }
+
+
+ // Parameters for addResourcesToSystemClassLoader
+ private static final Class<?>[] parameters = new Class[]{URL.class};
+ private static Method sysClassLoaderMethod = null;
+
+
+ @Private
+ public static synchronized void addResourcesToSystemClassLoader(List<URL> urls) throws TezException {
+ URLClassLoader sysLoader = (URLClassLoader)ClassLoader.getSystemClassLoader();
+
+ if (sysClassLoaderMethod == null) {
+ Class<?> sysClass = URLClassLoader.class;
+ Method method;
+ try {
+ method = sysClass.getDeclaredMethod("addURL", parameters);
+ } catch (SecurityException e) {
+ throw new TezException("Failed to get handle on method addURL", e);
+ } catch (NoSuchMethodException e) {
+ throw new TezException("Failed to get handle on method addURL", e);
+ }
+ method.setAccessible(true);
+ sysClassLoaderMethod = method;
+ }
+ for (URL url : urls) {
+ try {
+ sysClassLoaderMethod.invoke(sysLoader, new Object[] { url });
+ } catch (IllegalArgumentException e) {
+ throw new TezException("Failed to invoke addURL for rsrc: " + url, e);
+ } catch (IllegalAccessException e) {
+ throw new TezException("Failed to invoke addURL for rsrc: " + url, e);
+ } catch (InvocationTargetException e) {
+ throw new TezException("Failed to invoke addURL for rsrc: " + url, e);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index d64cc7f..d68dfad 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -23,6 +23,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.net.URI;
import java.net.URL;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
@@ -32,7 +33,6 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -70,8 +70,8 @@ import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.dag.utils.RelocalizationUtils;
import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
-import org.apache.tez.runtime.RuntimeUtils;
import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
@@ -87,8 +87,10 @@ import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl;
import org.apache.tez.runtime.common.objectregistry.ObjectRegistryModule;
import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
+import com.google.common.base.Function;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.inject.Guice;
import com.google.inject.Injector;
@@ -490,7 +492,18 @@ public class YarnTezDagChild {
if (LOG.isDebugEnabled()) {
LOG.debug("Additional Resources added to container: " + additionalResources);
}
- processAdditionalResources(additionalResources, defaultConf);
+
+ LOG.info("Localizing additional local resources for Task : " + additionalResources);
+ List<URL> downloadedUrls = RelocalizationUtils.processAdditionalResources(
+ Maps.transformValues(additionalResources, new Function<TezLocalResource, URI>() {
+ @Override
+ public URI apply(TezLocalResource input) {
+ return input.getUri();
+ }
+ }), defaultConf);
+ RelocalizationUtils.addUrlsToClassPath(downloadedUrls);
+
+ LOG.info("Done localizing additional resources");
final TaskSpec taskSpec = containerTask.getTaskSpec();
if (LOG.isDebugEnabled()) {
LOG.debug("New container task context:"
@@ -708,34 +721,4 @@ public class YarnTezDagChild {
}
TezUtils.updateLoggers(addend);
}
-
- private static void processAdditionalResources(Map<String, TezLocalResource> additionalResources,
- Configuration conf) throws IOException, TezException {
- if (additionalResources == null || additionalResources.isEmpty()) {
- return;
- }
-
- LOG.info("Downloading " + additionalResources.size() + "additional resources");
- List<URL> urls = Lists.newArrayListWithCapacity(additionalResources.size());
-
- for (Entry<String, TezLocalResource> lrEntry : additionalResources.entrySet()) {
- Path dFile = downloadResource(lrEntry.getKey(), lrEntry.getValue(), conf);
- urls.add(dFile.toUri().toURL());
- }
- RuntimeUtils.addResourcesToClasspath(urls);
- LOG.info("Done localizing additional resources");
- }
-
- private static Path downloadResource(String destName, TezLocalResource resource,
- Configuration conf) throws IOException {
- resource.getUri();
- FileSystem fs = FileSystem.get(resource.getUri(), conf);
- Path cwd = new Path(System.getenv(Environment.PWD.name()));
- Path dFile = new Path(cwd, destName);
- Path srcPath = new Path(resource.getUri());
- fs.copyToLocalFile(srcPath, dFile);
- return dFile.makeQualified(FileSystem.getLocal(conf).getUri(), cwd);
- }
-
-
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
index cca0827..f205f9f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
@@ -19,7 +19,9 @@
package org.apache.tez.dag.api.client.rpc;
import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.tez.client.TezSessionStatus;
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.TezException;
@@ -49,8 +51,7 @@ import org.apache.tez.dag.app.DAGAppMaster.DAGClientHandler;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
-public class DAGClientAMProtocolBlockingPBServerImpl implements
- DAGClientAMProtocolBlockingPB {
+public class DAGClientAMProtocolBlockingPBServerImpl implements DAGClientAMProtocolBlockingPB {
DAGClientHandler real;
@@ -122,7 +123,12 @@ public class DAGClientAMProtocolBlockingPBServerImpl implements
SubmitDAGRequestProto request) throws ServiceException {
try{
DAGPlan dagPlan = request.getDAGPlan();
- String dagId = real.submitDAG(dagPlan);
+ Map<String, LocalResource> additionalResources = null;
+ if (request.hasAdditionalAmResources()) {
+ additionalResources = DagTypeConverters.convertFromPlanLocalResources(request
+ .getAdditionalAmResources());
+ }
+ String dagId = real.submitDAG(dagPlan, additionalResources);
return SubmitDAGResponseProto.newBuilder().setDagId(dagId).build();
} catch(TezException e) {
throw wrapException(e);
@@ -166,5 +172,4 @@ public class DAGClientAMProtocolBlockingPBServerImpl implements
throw wrapException(e);
}
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 526bea7..6a92161 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -25,6 +25,9 @@ import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintWriter;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collections;
@@ -34,6 +37,7 @@ import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
@@ -81,6 +85,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.client.PreWarmContext;
import org.apache.tez.client.TezSessionStatus;
+import org.apache.tez.common.TezConverterUtils;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.counters.Limits;
import org.apache.tez.common.impl.LogUtils;
@@ -111,6 +116,7 @@ import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDAGFinished;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
import org.apache.tez.dag.app.dag.event.DAGEvent;
import org.apache.tez.dag.app.dag.event.DAGEventRecoverEvent;
+import org.apache.tez.dag.app.dag.event.DAGEventStartDag;
import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
@@ -138,10 +144,13 @@ import org.apache.tez.dag.history.events.DAGSubmittedEvent;
import org.apache.tez.dag.history.utils.DAGUtils;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.utils.Graph;
+import org.apache.tez.dag.utils.RelocalizationUtils;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.codehaus.jettison.json.JSONException;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.collect.Maps;
/**
* The Map-Reduce Application Master.
@@ -201,6 +210,8 @@ public class DAGAppMaster extends AbstractService {
private VertexEventDispatcher vertexEventDispatcher;
private TaskSchedulerEventHandler taskSchedulerEventHandler;
private HistoryEventHandler historyEventHandler;
+ private final Map<String, LocalResource> amResources = new HashMap<String, LocalResource>();
+ private final Map<String, LocalResource> cumulativeAdditionalResources = new HashMap<String, LocalResource>();
private final Map<String, LocalResource> sessionResources =
new HashMap<String, LocalResource>();
@@ -370,10 +381,14 @@ public class DAGAppMaster extends AbstractService {
try {
sessionResourcesStream = new FileInputStream(
TezConfiguration.TEZ_SESSION_LOCAL_RESOURCES_PB_FILE_NAME);
- PlanLocalResourcesProto localResourcesProto =
- PlanLocalResourcesProto.parseFrom(sessionResourcesStream);
+ PlanLocalResourcesProto sessionLocalResourcesProto =
+ PlanLocalResourcesProto.parseDelimitedFrom(sessionResourcesStream);
+ PlanLocalResourcesProto amLocalResourceProto = PlanLocalResourcesProto
+ .parseDelimitedFrom(sessionResourcesStream);
sessionResources.putAll(DagTypeConverters.convertFromPlanLocalResources(
- localResourcesProto));
+ sessionLocalResourcesProto));
+ amResources.putAll(DagTypeConverters.convertFromPlanLocalResources(amLocalResourceProto));
+ amResources.putAll(sessionResources);
} finally {
if (sessionResourcesStream != null) {
sessionResourcesStream.close();
@@ -902,8 +917,8 @@ public class DAGAppMaster extends AbstractService {
}
}
- synchronized String submitDAGToAppMaster(DAGPlan dagPlan)
- throws TezException {
+ synchronized String submitDAGToAppMaster(DAGPlan dagPlan,
+ Map<String, LocalResource> additionalResources) throws TezException {
if(currentDAG != null
&& !state.equals(DAGAppMasterState.IDLE)) {
throw new TezException("App master already running a DAG");
@@ -919,6 +934,8 @@ public class DAGAppMaster extends AbstractService {
LOG.info("Starting DAG submitted via RPC");
if (LOG.isDebugEnabled()) {
+ LOG.debug("Invoked with additional local resources: " + additionalResources);
+
LOG.debug("Writing DAG plan to: "
+ TezConfiguration.TEZ_PB_PLAN_TEXT_NAME);
@@ -935,7 +952,7 @@ public class DAGAppMaster extends AbstractService {
}
submittedDAGs.incrementAndGet();
- startDAG(dagPlan);
+ startDAG(dagPlan, additionalResources);
return currentDAG.getID().toString();
}
@@ -981,7 +998,7 @@ public class DAGAppMaster extends AbstractService {
+ ", processor=" + preWarmContext.getProcessorDescriptor().getClassName()
+ ", numContainers=" + preWarmContext.getNumTasks()
+ ", containerResource=" + preWarmContext.getResource());
- startDAG(dag.createDag(amConf));
+ startDAG(dag.createDag(amConf), null);
}
public class DAGClientHandler {
@@ -1036,8 +1053,9 @@ public class DAGAppMaster extends AbstractService {
sendEvent(new DAGEvent(dag.getID(), DAGEventType.DAG_KILL));
}
- public synchronized String submitDAG(DAGPlan dagPlan) throws TezException {
- return submitDAGToAppMaster(dagPlan);
+ public synchronized String submitDAG(DAGPlan dagPlan,
+ Map<String, LocalResource> additionalAmResources) throws TezException {
+ return submitDAGToAppMaster(dagPlan, additionalAmResources);
}
public synchronized void shutdownAM() {
@@ -1075,6 +1093,59 @@ public class DAGAppMaster extends AbstractService {
}
+ private Map<String, LocalResource> getAdditionalLocalResourceDiff(
+ Map<String, LocalResource> additionalResources) {
+ if (additionalResources == null) {
+ return Collections.emptyMap();
+ } else {
+ // Check for existing resources.
+ Iterator<Entry<String, LocalResource>> lrIter = additionalResources.entrySet().iterator();
+ while (lrIter.hasNext()) {
+ Entry<String, LocalResource> lrEntry = lrIter.next();
+ LocalResource existing = amResources.get(lrEntry.getKey());
+ if (existing != null) {
+ if (!existing.equals(lrEntry.getValue())) {
+ throw new TezUncheckedException(
+ "Cannot add different additional resources with the same name : "
+ + lrEntry.getKey() + ", Existing: [" + existing + "], New: ["
+ + lrEntry.getValue() + "]");
+ } else {
+ lrIter.remove();
+ }
+ }
+ }
+ return containerSignatureMatcher.getAdditionalResources(amResources, additionalResources);
+ }
+ }
+
+ private List<URL> processAdditionalResources(Map<String, LocalResource> lrDiff)
+ throws TezException {
+ if (lrDiff == null || lrDiff.isEmpty()) {
+ return Collections.emptyList();
+ } else {
+ LOG.info("Localizing additional local resources for AM : " + lrDiff);
+ List<URL> downloadedURLs;
+ try {
+ downloadedURLs = RelocalizationUtils.processAdditionalResources(
+ Maps.transformValues(lrDiff, new Function<LocalResource, URI>() {
+
+ @Override
+ public URI apply(LocalResource input) {
+ try {
+ return TezConverterUtils.getURIFromYarnURL(input.getResource());
+ } catch (URISyntaxException e) {
+ throw new TezUncheckedException("Failed while handling : " + input, e);
+ }
+ }
+ }), getConfig());
+ } catch (IOException e) {
+ throw new TezException(e);
+ }
+ LOG.info("Done downloading additional AM resources");
+ return downloadedURLs;
+ }
+ }
+
private class RunningAppContext implements AppContext {
private DAG dag;
@@ -1396,7 +1467,7 @@ public class DAGAppMaster extends AbstractService {
}
}
- private RecoveredDAGData recoverDAG() throws IOException {
+ private RecoveredDAGData recoverDAG() throws IOException, TezException {
if (recoveryEnabled) {
if (this.appAttemptID.getAttemptId() > 1) {
LOG.info("Recovering data from previous attempts"
@@ -1424,8 +1495,7 @@ public class DAGAppMaster extends AbstractService {
DefaultMetricsSystem.initialize("DAGAppMaster");
this.appsStartTime = clock.getTime();
- AMStartedEvent startEvent = new AMStartedEvent(appAttemptID,
- appsStartTime);
+ AMStartedEvent startEvent = new AMStartedEvent(appAttemptID, appsStartTime);
historyEventHandler.handle(
new DAGHistoryEvent(startEvent));
@@ -1451,6 +1521,13 @@ public class DAGAppMaster extends AbstractService {
}
if (recoveredDAGData != null) {
+ List<URL> classpathUrls = null;
+ if (recoveredDAGData.cumulativeAdditionalResources != null) {
+ classpathUrls = processAdditionalResources(recoveredDAGData.cumulativeAdditionalResources);
+ amResources.putAll(recoveredDAGData.cumulativeAdditionalResources);
+ cumulativeAdditionalResources.putAll(recoveredDAGData.cumulativeAdditionalResources);
+ }
+
if (recoveredDAGData.isCompleted
|| recoveredDAGData.nonRecoverable) {
LOG.info("Found previous DAG in completed or non-recoverable state"
@@ -1464,21 +1541,21 @@ public class DAGAppMaster extends AbstractService {
if (recoveredDAGData.nonRecoverable) {
DAGEventRecoverEvent recoverDAGEvent =
new DAGEventRecoverEvent(recoveredDAGData.recoveredDAG.getID(),
- DAGState.FAILED);
+ DAGState.FAILED, classpathUrls);
dagEventDispatcher.handle(recoverDAGEvent);
this.state = DAGAppMasterState.RUNNING;
} else {
DAGEventRecoverEvent recoverDAGEvent =
new DAGEventRecoverEvent(recoveredDAGData.recoveredDAG.getID(),
- recoveredDAGData.dagState);
+ recoveredDAGData.dagState, classpathUrls);
dagEventDispatcher.handle(recoverDAGEvent);
this.state = DAGAppMasterState.RUNNING;
}
} else {
LOG.info("Found DAG to recover, dagId=" + recoveredDAGData.recoveredDAG.getID());
_updateLoggers(recoveredDAGData.recoveredDAG, "");
- DAGEvent recoverDAGEvent = new DAGEvent(recoveredDAGData.recoveredDAG.getID(),
- DAGEventType.DAG_RECOVER);
+ DAGEventRecoverEvent recoverDAGEvent = new DAGEventRecoverEvent(
+ recoveredDAGData.recoveredDAG.getID(), classpathUrls);
dagEventDispatcher.handle(recoverDAGEvent);
this.state = DAGAppMasterState.RUNNING;
}
@@ -1706,7 +1783,7 @@ public class DAGAppMaster extends AbstractService {
}
}
- private void startDAG() throws IOException {
+ private void startDAG() throws IOException, TezException {
FileInputStream dagPBBinaryStream = null;
try {
DAGPlan dagPlan = null;
@@ -1716,7 +1793,7 @@ public class DAGAppMaster extends AbstractService {
TezConfiguration.TEZ_PB_PLAN_BINARY_NAME);
dagPlan = DAGPlan.parseFrom(dagPBBinaryStream);
- startDAG(dagPlan);
+ startDAG(dagPlan, null);
} finally {
if (dagPBBinaryStream != null) {
@@ -1725,7 +1802,8 @@ public class DAGAppMaster extends AbstractService {
}
}
- private void startDAG(DAGPlan dagPlan) {
+ private void startDAG(DAGPlan dagPlan, Map<String, LocalResource> additionalAMResources)
+ throws TezException {
long submitTime = this.clock.getTime();
this.state = DAGAppMasterState.RUNNING;
this.appName = dagPlan.getName();
@@ -1739,24 +1817,47 @@ public class DAGAppMaster extends AbstractService {
LOG.debug("DAG has vertex " + v.getName());
}
}
+
+ Map<String, LocalResource> lrDiff = getAdditionalLocalResourceDiff(additionalAMResources);
+ if (lrDiff != null) {
+ amResources.putAll(lrDiff);
+ cumulativeAdditionalResources.putAll(lrDiff);
+ }
LOG.info("Running DAG: " + dagPlan.getName());
// Job name is the same as the app name until we support multiple dags
// for an app later
DAGSubmittedEvent submittedEvent = new DAGSubmittedEvent(newDAG.getID(),
- submitTime, dagPlan, this.appAttemptID);
+ submitTime, dagPlan, this.appAttemptID, cumulativeAdditionalResources);
try {
historyEventHandler.handleCriticalEvent(
new DAGHistoryEvent(newDAG.getID(), submittedEvent));
} catch (IOException e) {
throw new RuntimeException(e);
}
- startDAG(newDAG);
+
+ startDAGExecution(newDAG, lrDiff);
}
- private void startDAG(DAG dag) {
+ private void startDAGExecution(DAG dag, final Map<String, LocalResource> additionalAmResources)
+ throws TezException {
currentDAG = dag;
+ // Try localizing the actual resources.
+ List<URL> additionalUrlsForClasspath;
+ try {
+ additionalUrlsForClasspath = dag.getDagUGI().doAs(new PrivilegedExceptionAction<List<URL>>() {
+ @Override
+ public List<URL> run() throws Exception {
+ return processAdditionalResources(additionalAmResources);
+ }
+ });
+ } catch (IOException e) {
+ throw new TezException(e);
+ } catch (InterruptedException e) {
+ throw new TezException(e);
+ }
+
// End of creating the job.
((RunningAppContext) context).setDAG(currentDAG);
@@ -1769,7 +1870,7 @@ public class DAGAppMaster extends AbstractService {
// All components have started, start the job.
/** create a job-start event to get this ball rolling */
- DAGEvent startDagEvent = new DAGEvent(currentDAG.getID(), DAGEventType.DAG_START);
+ DAGEvent startDagEvent = new DAGEventStartDag(currentDAG.getID(), additionalUrlsForClasspath);
/** send the job-start event. this triggers the job execution. */
sendEvent(startDagEvent);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
index 1a9b760..093069c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.app.dag.Task;
@@ -103,6 +104,7 @@ public class RecoveryParser {
public boolean isCompleted = false;
public boolean nonRecoverable = false;
public String reason = null;
+ public Map<String, LocalResource> cumulativeAdditionalResources = null;
}
private static void parseSummaryFile(FSDataInputStream inputStream)
@@ -627,11 +629,14 @@ public class RecoveryParser {
switch (eventType) {
case DAG_SUBMITTED:
{
+ DAGSubmittedEvent submittedEvent = (DAGSubmittedEvent) event;
LOG.info("Recovering from event"
+ ", eventType=" + eventType
+ ", event=" + event.toString());
- recoveredDAGData.recoveredDAG = dagAppMaster.createDAG(((DAGSubmittedEvent) event).getDAGPlan(),
+ recoveredDAGData.recoveredDAG = dagAppMaster.createDAG(submittedEvent.getDAGPlan(),
lastInProgressDAG);
+ recoveredDAGData.cumulativeAdditionalResources = submittedEvent
+ .getCumulativeAdditionalLocalResources();
recoveredDAGData.recoveredDagID = recoveredDAGData.recoveredDAG.getID();
dagAppMaster.setCurrentDAG(recoveredDAGData.recoveredDAG);
if (recoveredDAGData.nonRecoverable) {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerRunner.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerRunner.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerRunner.java
index 7d3cd28..6b65af3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerRunner.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerRunner.java
@@ -29,6 +29,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.common.RuntimeUtils;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.app.dag.event.VertexEventRootInputFailed;
import org.apache.tez.dag.app.dag.event.VertexEventRootInputInitialized;
@@ -148,13 +149,12 @@ public class RootInputInitializerRunner {
return events;
}
- private TezRootInputInitializer createInitializer()
- throws ClassNotFoundException, InstantiationException,
+ private TezRootInputInitializer createInitializer() throws InstantiationException,
IllegalAccessException {
String className = input.getInitializerClassName();
@SuppressWarnings("unchecked")
- Class<? extends TezRootInputInitializer> clazz = (Class<? extends TezRootInputInitializer>) Class
- .forName(className);
+ Class<? extends TezRootInputInitializer> clazz = (Class<? extends TezRootInputInitializer>) RuntimeUtils
+ .getClazz(className);
TezRootInputInitializer initializer = clazz.newInstance();
return initializer;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventRecoverEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventRecoverEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventRecoverEvent.java
index e64ad13..45e44f3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventRecoverEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventRecoverEvent.java
@@ -18,20 +18,37 @@
package org.apache.tez.dag.app.dag.event;
+import java.net.URL;
+import java.util.List;
+
import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.records.TezDAGID;
public class DAGEventRecoverEvent extends DAGEvent {
private final DAGState desiredState;
+ private final List<URL> additionalUrlsForClasspath;
- public DAGEventRecoverEvent(TezDAGID dagId, DAGState desiredState) {
+ public DAGEventRecoverEvent(TezDAGID dagId, DAGState desiredState,
+ List<URL> additionalUrlsForClasspath) {
super(dagId, DAGEventType.DAG_RECOVER);
this.desiredState = desiredState;
+ this.additionalUrlsForClasspath = additionalUrlsForClasspath;
}
-
+
+ public DAGEventRecoverEvent(TezDAGID dagId, List<URL> additionalUrlsForClasspath) {
+ this(dagId, null, additionalUrlsForClasspath);
+ }
+
public DAGState getDesiredState() {
return desiredState;
}
+
+ public List<URL> getAdditionalUrlsForClasspath() {
+ return this.additionalUrlsForClasspath;
+ }
+ public boolean hasDesiredState() {
+ return this.desiredState != null;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventStartDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventStartDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventStartDag.java
new file mode 100644
index 0000000..9e6d75b
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventStartDag.java
@@ -0,0 +1,38 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import java.net.URL;
+import java.util.List;
+
+import org.apache.tez.dag.records.TezDAGID;
+
+public class DAGEventStartDag extends DAGEvent {
+
+ private final List<URL> additionalUrlsForClasspath;
+
+ public DAGEventStartDag(TezDAGID dagId, List<URL> additionalUrlsForClasspath) {
+ super(dagId, DAGEventType.DAG_START);
+ this.additionalUrlsForClasspath = additionalUrlsForClasspath;
+ }
+
+ public List<URL> getAdditionalUrlsForClasspath() {
+ return this.additionalUrlsForClasspath;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 0e4c504..c92d51b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -19,6 +19,7 @@
package org.apache.tez.dag.app.dag.impl;
import java.io.IOException;
+import java.net.URL;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
@@ -88,6 +89,7 @@ import org.apache.tez.dag.app.dag.event.DAGEventDiagnosticsUpdate;
import org.apache.tez.dag.app.dag.event.DAGEventRecoverEvent;
import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdateTAAssigned;
+import org.apache.tez.dag.app.dag.event.DAGEventStartDag;
import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted;
import org.apache.tez.dag.app.dag.event.DAGEventVertexReRunning;
@@ -105,6 +107,7 @@ import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.dag.utils.RelocalizationUtils;
import org.apache.tez.dag.utils.TezBuilderUtils;
import org.apache.tez.runtime.api.OutputCommitter;
@@ -1301,17 +1304,22 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
@Override
public DAGState transition(DAGImpl dag, DAGEvent dagEvent) {
- if (dagEvent instanceof DAGEventRecoverEvent) {
+ DAGEventRecoverEvent recoverEvent = (DAGEventRecoverEvent) dagEvent;
+ if (recoverEvent.hasDesiredState()) {
// DAG completed or final end state known
- DAGEventRecoverEvent recoverEvent = (DAGEventRecoverEvent) dagEvent;
dag.recoveredState = recoverEvent.getDesiredState();
}
+ if (recoverEvent.getAdditionalUrlsForClasspath() != null) {
+ LOG.info("Added additional resources : [" + recoverEvent.getAdditionalUrlsForClasspath()
+ + "] to classpath");
+ RelocalizationUtils.addUrlsToClassPath(recoverEvent.getAdditionalUrlsForClasspath());
+ }
switch (dag.recoveredState) {
case NEW:
// send DAG an Init and start events
dag.eventHandler.handle(new DAGEvent(dag.getID(), DAGEventType.DAG_INIT));
- dag.eventHandler.handle(new DAGEvent(dag.getID(), DAGEventType.DAG_START));
+ dag.eventHandler.handle(new DAGEventStartDag(dag.getID(), null));
return DAGState.NEW;
case INITED:
// DAG inited but not started
@@ -1464,8 +1472,14 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
*/
@Override
public void transition(DAGImpl dag, DAGEvent event) {
+ DAGEventStartDag startEvent = (DAGEventStartDag) event;
dag.startTime = dag.clock.getTime();
dag.logJobHistoryStartedEvent();
+ List<URL> additionalUrlsForClasspath = startEvent.getAdditionalUrlsForClasspath();
+ if (additionalUrlsForClasspath != null) {
+ LOG.info("Added additional resources : [" + additionalUrlsForClasspath + "] to classpath");
+ RelocalizationUtils.addUrlsToClassPath(additionalUrlsForClasspath);
+ }
// TODO Metrics
//job.metrics.runningJob(job);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
index aecb5d2..2e07a4d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.common.RuntimeUtils;
import org.apache.tez.dag.api.EdgeManager;
import org.apache.tez.dag.api.EdgeManagerContext;
import org.apache.tez.dag.api.EdgeManagerDescriptor;
@@ -37,7 +38,6 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed;
import org.apache.tez.dag.app.dag.event.TaskEventAddTezEvent;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
-import org.apache.tez.runtime.RuntimeUtils;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.api.events.DataMovementEvent;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 6f1398a..8e7bf8d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Clock;
+import org.apache.tez.common.RuntimeUtils;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.EdgeManagerDescriptor;
@@ -122,7 +123,6 @@ import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.runtime.RuntimeUtils;
import org.apache.tez.runtime.api.OutputCommitter;
import org.apache.tez.runtime.api.OutputCommitterContext;
import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index 6f75481..f4404d5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -28,6 +28,7 @@ import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.common.RuntimeUtils;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.EdgeManagerDescriptor;
import org.apache.tez.dag.api.EdgeProperty;
@@ -42,7 +43,6 @@ import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
-import org.apache.tez.runtime.RuntimeUtils;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
index af83dc8..18f2205 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
@@ -21,9 +21,12 @@ package org.apache.tez.dag.history.events;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
@@ -45,16 +48,19 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
private long submitTime;
private DAGProtos.DAGPlan dagPlan;
private ApplicationAttemptId applicationAttemptId;
+ private Map<String, LocalResource> cumulativeAdditionalLocalResources;
public DAGSubmittedEvent() {
}
public DAGSubmittedEvent(TezDAGID dagID, long submitTime,
- DAGProtos.DAGPlan dagPlan, ApplicationAttemptId applicationAttemptId) {
+ DAGProtos.DAGPlan dagPlan, ApplicationAttemptId applicationAttemptId,
+ Map<String, LocalResource> cumulativeAdditionalLocalResources) {
this.dagID = dagID;
this.submitTime = submitTime;
this.dagPlan = dagPlan;
this.applicationAttemptId = applicationAttemptId;
+ this.cumulativeAdditionalLocalResources = cumulativeAdditionalLocalResources;
}
@Override
@@ -129,21 +135,28 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
}
public DAGSubmittedProto toProto() {
- return DAGSubmittedProto.newBuilder()
+ DAGSubmittedProto.Builder builder =DAGSubmittedProto.newBuilder()
.setDagId(dagID.toString())
.setApplicationAttemptId(applicationAttemptId.toString())
.setDagPlan(dagPlan)
- .setSubmitTime(submitTime)
- .build();
-
+ .setSubmitTime(submitTime);
+ if (cumulativeAdditionalLocalResources != null && !cumulativeAdditionalLocalResources.isEmpty()) {
+ builder.setCumulativeAdditionalAmResources(DagTypeConverters
+ .convertFromLocalResources(cumulativeAdditionalLocalResources));
+ }
+ return builder.build();
}
-
+
public void fromProto(DAGSubmittedProto proto) {
this.dagID = TezDAGID.fromString(proto.getDagId());
this.dagPlan = proto.getDagPlan();
this.submitTime = proto.getSubmitTime();
this.applicationAttemptId = ConverterUtils.toApplicationAttemptId(
proto.getApplicationAttemptId());
+ if (proto.hasCumulativeAdditionalAmResources()) {
+ this.cumulativeAdditionalLocalResources = DagTypeConverters.convertFromPlanLocalResources(proto
+ .getCumulativeAdditionalAmResources());
+ }
}
@Override
@@ -198,6 +211,10 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
public ApplicationAttemptId getApplicationAttemptId() {
return applicationAttemptId;
}
+
+ public Map<String, LocalResource> getCumulativeAdditionalLocalResources() {
+ return cumulativeAdditionalLocalResources;
+ }
public long getSubmitTime() {
return submitTime;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-dag/src/main/java/org/apache/tez/dag/utils/RelocalizationUtils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/utils/RelocalizationUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/utils/RelocalizationUtils.java
new file mode 100644
index 0000000..7a774b2
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/utils/RelocalizationUtils.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.utils;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.tez.common.RuntimeUtils;
+import org.apache.tez.dag.api.TezException;
+
+import com.google.common.collect.Lists;
+
+public class RelocalizationUtils {
+
+ public static List<URL> processAdditionalResources(Map<String, URI> additionalResources,
+ Configuration conf) throws IOException, TezException {
+ if (additionalResources == null || additionalResources.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ List<URL> urls = Lists.newArrayListWithCapacity(additionalResources.size());
+
+ for (Entry<String, URI> lrEntry : additionalResources.entrySet()) {
+ Path dFile = downloadResource(lrEntry.getKey(), lrEntry.getValue(), conf);
+ urls.add(dFile.toUri().toURL());
+ }
+ return urls;
+ }
+
+ public static void addUrlsToClassPath(List<URL> urls) {
+ RuntimeUtils.addResourcesToClasspath(urls);
+ }
+
+ private static Path downloadResource(String destName, URI uri, Configuration conf)
+ throws IOException {
+ FileSystem fs = FileSystem.get(uri, conf);
+ Path cwd = new Path(System.getenv(Environment.PWD.name()));
+ Path dFile = new Path(cwd, destName);
+ Path srcPath = new Path(uri);
+ fs.copyToLocalFile(srcPath, dFile);
+ return dFile.makeQualified(FileSystem.getLocal(conf).getUri(), cwd);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-dag/src/main/proto/HistoryEvents.proto
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/proto/HistoryEvents.proto b/tez-dag/src/main/proto/HistoryEvents.proto
index c640baa..c73870a 100644
--- a/tez-dag/src/main/proto/HistoryEvents.proto
+++ b/tez-dag/src/main/proto/HistoryEvents.proto
@@ -44,6 +44,7 @@ message DAGSubmittedProto {
optional DAGPlan dag_plan = 2;
optional int64 submit_time = 3;
optional string application_attempt_id = 4;
+ optional PlanLocalResourcesProto cumulative_additional_am_resources = 5;
}
message DAGInitializedProto {
@@ -183,4 +184,4 @@ message SummaryEventProto {
message VertexFinishStateProto {
optional string vertex_id = 1;
optional int32 state = 2;
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index 60c7b31..4e790e6 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -77,6 +77,7 @@ import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
import org.apache.tez.dag.app.dag.event.DAGEvent;
import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate.UpdateType;
+import org.apache.tez.dag.app.dag.event.DAGEventStartDag;
import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted;
import org.apache.tez.dag.app.dag.event.TaskEvent;
@@ -654,7 +655,7 @@ public class TestDAGImpl {
@SuppressWarnings("unchecked")
private void startDAG(DAGImpl impl) {
dispatcher.getEventHandler().handle(
- new DAGEvent(impl.getID(), DAGEventType.DAG_START));
+ new DAGEventStartDag(impl.getID(), null));
dispatcher.await();
Assert.assertEquals(DAGState.RUNNING, impl.getState());
}
@@ -1043,7 +1044,7 @@ public class TestDAGImpl {
@Test(timeout = 5000)
public void testInvalidEvent() {
dispatcher.getEventHandler().handle(
- new DAGEvent(dagId, DAGEventType.DAG_START));
+ new DAGEventStartDag(dagId, null));
dispatcher.await();
Assert.assertEquals(DAGState.ERROR, dag.getState());
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index b9cae70..e533ffd 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.tez.common.RuntimeUtils;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.EdgeManagerDescriptor;
import org.apache.tez.dag.api.VertexLocationHint;
@@ -42,7 +43,6 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
-import org.apache.tez.runtime.RuntimeUtils;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
@@ -141,7 +141,7 @@ public class TestHistoryEventsProtoConversion {
ApplicationId.newInstance(0, 1), 1), 1001l,
DAGPlan.newBuilder().setName("foo").build(),
ApplicationAttemptId.newInstance(
- ApplicationId.newInstance(0, 1), 1));
+ ApplicationId.newInstance(0, 1), 1), null);
DAGSubmittedEvent deserializedEvent = (DAGSubmittedEvent)
testProtoConversion(event);
Assert.assertEquals(event.getApplicationAttemptId(),
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
index 6a215de..6e94c22 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.RuntimeUtils;
import org.apache.tez.dag.api.TezUncheckedException;
import com.google.common.base.Preconditions;
@@ -90,15 +91,11 @@ public class TezGroupedSplitsInputFormat<K, V>
}
}
}
-
+
static Class<?> getClassFromName(String name) {
- try {
- return Class.forName(name);
- } catch (ClassNotFoundException e1) {
- throw new TezUncheckedException(e1);
- }
+ return RuntimeUtils.getClazz(name);
}
-
+
public class TezGroupedSplitsRecordReader implements RecordReader<K, V> {
TezGroupedSplit groupedSplit;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
index 874d9a4..5fa3e79 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.RuntimeUtils;
import org.apache.tez.dag.api.TezUncheckedException;
import com.google.common.base.Preconditions;
@@ -127,11 +128,7 @@ public class TezGroupedSplitsInputFormat<K, V> extends InputFormat<K, V>
}
static Class<?> getClassFromName(String name) {
- try {
- return Class.forName(name);
- } catch (ClassNotFoundException e1) {
- throw new TezUncheckedException(e1);
- }
+ return RuntimeUtils.getClazz(name);
}
public class TezGroupedSplitsRecordReader extends RecordReader<K, V> {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index d2d9b62..d211a62 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -42,6 +42,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.StringUtils;
+import org.apache.tez.common.RuntimeUtils;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezUncheckedException;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeUtils.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeUtils.java
deleted file mode 100644
index d414f31..0000000
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeUtils.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.runtime;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.TezUncheckedException;
-
-public class RuntimeUtils {
-
- private static final Map<String, Class<?>> CLAZZ_CACHE = new ConcurrentHashMap<String, Class<?>>();
-
- @Private
- public static Class<?> getClazz(String className) {
- Class<?> clazz = CLAZZ_CACHE.get(className);
- if (clazz == null) {
- try {
- clazz = Class.forName(className);
- } catch (ClassNotFoundException e) {
- throw new TezUncheckedException("Unable to load class: " + className, e);
- }
- }
- return clazz;
- }
-
- private static <T> T getNewInstance(Class<T> clazz) {
- T instance;
- try {
- instance = clazz.newInstance();
- } catch (InstantiationException e) {
- throw new TezUncheckedException(
- "Unable to instantiate class with 0 arguments: " + clazz.getName(), e);
- } catch (IllegalAccessException e) {
- throw new TezUncheckedException(
- "Unable to instantiate class with 0 arguments: " + clazz.getName(), e);
- }
- return instance;
- }
-
- @Private
- public static <T> T createClazzInstance(String className) {
- Class<?> clazz = getClazz(className);
- @SuppressWarnings("unchecked")
- T instance = (T) getNewInstance(clazz);
- return instance;
- }
-
-
- @Private
- public static synchronized void addResourcesToClasspath(List<URL> urls) {
- ClassLoader classLoader = new URLClassLoader(urls.toArray(new URL[urls.size()]), Thread
- .currentThread().getContextClassLoader());
- Thread.currentThread().setContextClassLoader(classLoader);
- }
-
-
- // Parameters for addResourcesToSystemClassLoader
- private static final Class<?>[] parameters = new Class[]{URL.class};
- private static Method sysClassLoaderMethod = null;
-
-
- @Private
- public static synchronized void addResourcesToSystemClassLoader(List<URL> urls) throws TezException {
- URLClassLoader sysLoader = (URLClassLoader)ClassLoader.getSystemClassLoader();
-
- if (sysClassLoaderMethod == null) {
- Class<?> sysClass = URLClassLoader.class;
- Method method;
- try {
- method = sysClass.getDeclaredMethod("addURL", parameters);
- } catch (SecurityException e) {
- throw new TezException("Failed to get handle on method addURL", e);
- } catch (NoSuchMethodException e) {
- throw new TezException("Failed to get handle on method addURL", e);
- }
- method.setAccessible(true);
- sysClassLoaderMethod = method;
- }
- for (URL url : urls) {
- try {
- sysClassLoaderMethod.invoke(sysLoader, new Object[] { url });
- } catch (IllegalArgumentException e) {
- throw new TezException("Failed to invoke addURL for rsrc: " + url, e);
- } catch (IllegalAccessException e) {
- throw new TezException("Failed to invoke addURL for rsrcs: " + urls, e);
- } catch (InvocationTargetException e) {
- throw new TezException("Failed to invoke addURL for rsrcs: " + urls, e);
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestRuntimeUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestRuntimeUtils.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestRuntimeUtils.java
index 5f76afa..f374cc0 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestRuntimeUtils.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestRuntimeUtils.java
@@ -29,6 +29,7 @@ import java.util.Collections;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.tez.common.RuntimeUtils;
import org.apache.tez.dag.api.TezException;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
index 334ebb4..2db972f 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
@@ -26,6 +26,7 @@ import java.util.Map;
import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.RuntimeUtils;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.EdgeManager;
import org.apache.tez.dag.api.EdgeManagerContext;
@@ -37,7 +38,6 @@ import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
import org.apache.tez.dag.api.VertexManagerPluginContext;
-import org.apache.tez.runtime.RuntimeUtils;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
index bb71dfe..2c7661f 100644
--- a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
+++ b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
@@ -21,6 +21,7 @@ package org.apache.tez.mapreduce;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Random;
@@ -53,6 +54,7 @@ import org.apache.tez.client.TezClientUtils;
import org.apache.tez.client.TezSession;
import org.apache.tez.client.TezSessionConfiguration;
import org.apache.tez.client.TezSessionStatus;
+import org.apache.tez.common.RuntimeUtils;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeProperty;
@@ -83,7 +85,10 @@ import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
import org.apache.tez.mapreduce.processor.map.MapProcessor;
import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
+import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.TezRootInputInitializer;
+import org.apache.tez.runtime.api.TezRootInputInitializerContext;
import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy;
import org.apache.tez.runtime.library.output.OnFileSortedOutput;
import org.apache.tez.test.MiniTezCluster;
@@ -92,6 +97,7 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
+
public class TestMRRJobsDAGApi {
private static final Log LOG = LogFactory.getLog(TestMRRJobsDAGApi.class);
@@ -121,6 +127,7 @@ public class TestMRRJobsDAGApi {
1, 1, 1);
Configuration conf = new Configuration();
conf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
+ conf.setInt("yarn.nodemanager.delete.debug-delay-sec", 20000);
mrrTezCluster.init(conf);
mrrTezCluster.start();
}
@@ -172,6 +179,92 @@ public class TestMRRJobsDAGApi {
Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
}
+ // Submit 2 jobs via RPC using a custom initializer. The second job is submitted with an
+ // additional local resource, which is verified by the initializer.
+ @Test(timeout = 120000)
+ public void testAMRelocalization() throws Exception {
+ Map<String, String> commonEnv = createCommonEnv();
+ Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String
+ .valueOf(new Random().nextInt(100000))));
+ remoteFs.mkdirs(remoteStagingDir);
+ TezConfiguration tezConf = new TezConfiguration(
+ mrrTezCluster.getConfig());
+ tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
+ remoteStagingDir.toString());
+
+ Map<String, LocalResource> amLocalResources =
+ new HashMap<String, LocalResource>();
+
+ AMConfiguration amConfig = new AMConfiguration(
+ commonEnv, amLocalResources,
+ tezConf, null);
+ TezSessionConfiguration tezSessionConfig =
+ new TezSessionConfiguration(amConfig, tezConf);
+ TezSession tezSession = new TezSession("testrelocalizationsession", tezSessionConfig);
+ tezSession.start();
+ Assert.assertEquals(TezSessionStatus.INITIALIZING,
+ tezSession.getSessionStatus());
+
+ State finalState = testMRRSleepJobDagSubmitCore(true, false, false,
+ tezSession, true, MRInputAMSplitGeneratorRelocalizationTest.class, null);
+ Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
+ Assert.assertFalse(remoteFs.exists(new Path("/tmp/relocalizationfilefound")));
+
+ // Start the second job with some additional resources.
+
+ Path relocFilePath = new Path("/tmp/test.jar");
+ relocFilePath = remoteFs.makeQualified(relocFilePath);
+
+ java.net.URL url = Thread.currentThread().getContextClassLoader().getResource("test_jar");
+ FileSystem localFs = FileSystem.getLocal(tezConf);
+ Path localPath = new Path(url.toURI());
+ localPath = localFs.makeQualified(localPath);
+ LOG.info("Copying file from local path: " + localPath);
+
+ remoteFs.copyFromLocalFile(localPath, relocFilePath);
+
+ Map<String, LocalResource> additionalResources = new HashMap<String, LocalResource>();
+ additionalResources.put("test.jar", LocalResource.newInstance(
+ ConverterUtils.getYarnUrlFromPath(relocFilePath), LocalResourceType.FILE,
+ LocalResourceVisibility.PRIVATE, 0, 0));
+
+ Assert.assertEquals(TezSessionStatus.READY,
+ tezSession.getSessionStatus());
+ finalState = testMRRSleepJobDagSubmitCore(true, false, false,
+ tezSession, true, MRInputAMSplitGeneratorRelocalizationTest.class, additionalResources);
+ Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
+ Assert.assertEquals(TezSessionStatus.READY,
+ tezSession.getSessionStatus());
+ Assert.assertTrue(remoteFs.exists(new Path("/tmp/relocalizationfilefound")));
+
+ ApplicationId appId = tezSession.getApplicationId();
+ tezSession.stop();
+ Assert.assertEquals(TezSessionStatus.SHUTDOWN,
+ tezSession.getSessionStatus());
+
+ YarnClient yarnClient = YarnClient.createYarnClient();
+ yarnClient.init(mrrTezCluster.getConfig());
+ yarnClient.start();
+
+ while (true) {
+ ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+ if (appReport.getYarnApplicationState().equals(
+ YarnApplicationState.FINISHED)
+ || appReport.getYarnApplicationState().equals(
+ YarnApplicationState.FAILED)
+ || appReport.getYarnApplicationState().equals(
+ YarnApplicationState.KILLED)) {
+ break;
+ }
+ }
+
+ ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+ Assert.assertEquals(YarnApplicationState.FINISHED,
+ appReport.getYarnApplicationState());
+ Assert.assertEquals(FinalApplicationStatus.SUCCEEDED,
+ appReport.getFinalApplicationStatus());
+ }
+
// Submits a DAG to AM via RPC after AM has started
@Test(timeout = 120000)
public void testMultipleMRRSleepJobViaSession() throws IOException,
@@ -199,12 +292,12 @@ public class TestMRRJobsDAGApi {
tezSession.getSessionStatus());
State finalState = testMRRSleepJobDagSubmitCore(true, false, false,
- tezSession, false);
+ tezSession, false, null, null);
Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
Assert.assertEquals(TezSessionStatus.READY,
tezSession.getSessionStatus());
finalState = testMRRSleepJobDagSubmitCore(true, false, false,
- tezSession, false);
+ tezSession, false, null, null);
Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
Assert.assertEquals(TezSessionStatus.READY,
tezSession.getSessionStatus());
@@ -269,7 +362,7 @@ public class TestMRRJobsDAGApi {
InterruptedException, TezException, ClassNotFoundException,
YarnException {
return testMRRSleepJobDagSubmitCore(dagViaRPC, killDagWhileRunning,
- closeSessionBeforeSubmit, null, genSplitsInAM);
+ closeSessionBeforeSubmit, null, genSplitsInAM, null, null);
}
private Map<String, String> createCommonEnv() {
@@ -282,7 +375,9 @@ public class TestMRRJobsDAGApi {
boolean killDagWhileRunning,
boolean closeSessionBeforeSubmit,
TezSession reUseTezSession,
- boolean genSplitsInAM) throws IOException,
+ boolean genSplitsInAM,
+ Class<? extends TezRootInputInitializer> initializerClass,
+ Map<String, LocalResource> additionalLocalResources) throws IOException,
InterruptedException, TezException, ClassNotFoundException,
YarnException {
LOG.info("\n\n\nStarting testMRRSleepJobDagSubmit().");
@@ -358,8 +453,10 @@ public class TestMRRJobsDAGApi {
DAG dag = new DAG("testMRRSleepJobDagSubmit");
int stage1NumTasks = genSplitsInAM ? -1 : inputSplitInfo.getNumTasks();
- Class<? extends TezRootInputInitializer> inputInitializerClazz = genSplitsInAM ? MRInputAMSplitGenerator.class
+ Class<? extends TezRootInputInitializer> inputInitializerClazz =
+ genSplitsInAM ? (initializerClass == null ? MRInputAMSplitGenerator.class : initializerClass)
: null;
+ LOG.info("Using initializer class: " + initializerClass);
Vertex stage1Vertex = new Vertex("map", new ProcessorDescriptor(
MapProcessor.class.getName()).setUserPayload(stage1Payload),
stage1NumTasks, Resource.newInstance(256, 1));
@@ -496,7 +593,7 @@ public class TestMRRJobsDAGApi {
if(dagViaRPC) {
LOG.info("Submitting dag to tez session with appId="
+ tezSession.getApplicationId());
- dagClient = tezSession.submitDAG(dag);
+ dagClient = tezSession.submitDAG(dag, additionalLocalResources);
Assert.assertEquals(TezSessionStatus.RUNNING,
tezSession.getSessionStatus());
}
@@ -556,4 +653,26 @@ public class TestMRRJobsDAGApi {
throw new TezUncheckedException("VertexGroups Test Failed");
}
}
+
+ // This class should not be used by more than one test in a single run, since
+ // the path it writes to is not dynamic.
+ public static class MRInputAMSplitGeneratorRelocalizationTest extends MRInputAMSplitGenerator {
+ public List<Event> initialize(TezRootInputInitializerContext rootInputContext) throws Exception {
+ MRInputUserPayloadProto userPayloadProto = MRHelpers
+ .parseMRInputPayload(rootInputContext.getUserPayload());
+ Configuration conf = MRHelpers.createConfFromByteString(userPayloadProto
+ .getConfigurationBytes());
+
+ try {
+ RuntimeUtils.getClazz("org.apache.tez.test.Test");
+ LOG.info("Class found");
+ FileSystem fs = FileSystem.get(conf);
+ fs.mkdirs(new Path("/tmp/relocalizationfilefound"));
+ } catch (TezUncheckedException e) {
+ LOG.info("Class not found");
+ }
+
+ return super.initialize(rootInputContext);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
index ebbc3c1..dd2fd75 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
@@ -141,6 +141,5 @@ public class TestTezJobs {
assertNotNull(dagStatus.getDAGCounters().getGroup(FileSystemCounter.class.getName()));
assertNotNull(dagStatus.getDAGCounters().findCounter(TaskCounter.GC_TIME_MILLIS));
ExampleDriver.printDAGStatus(dagClient, new String[] { "SleepVertex" }, true, true);
-
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-tests/src/test/resources/test_jar
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/resources/test_jar b/tez-tests/src/test/resources/test_jar
new file mode 100644
index 0000000..47414a2
Binary files /dev/null and b/tez-tests/src/test/resources/test_jar differ