You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by sr...@apache.org on 2016/01/20 18:05:35 UTC
[09/50] [abbrv] tez git commit: TEZ-3025. InputInitializer creation
should use the dag ugi. (sseth)
TEZ-3025. InputInitializer creation should use the dag ugi. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d5c9649e
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d5c9649e
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d5c9649e
Branch: refs/heads/TEZ-2980
Commit: d5c9649e5849021dd1bfdb796d821f7a1524aaf0
Parents: 9816a49
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Jan 7 17:38:40 2016 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Jan 7 17:38:40 2016 -0800
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../app/dag/RootInputInitializerManager.java | 32 +++++++--
.../dag/TestRootInputInitializerManager.java | 71 ++++++++++++++++++++
3 files changed, 98 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/d5c9649e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ce35fd1..5e944b6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -11,6 +11,7 @@ INCOMPATIBLE CHANGES
TEZ-2972. Avoid task rescheduling when a node turns unhealthy
ALL CHANGES:
+ TEZ-3025. InputInitializer creation should use the dag ugi.
TEZ-3017. HistoryACLManager does not have a close method for cleanup
TEZ-2914. Ability to limit vertex concurrency
TEZ-3011. Link Vertex Name in Dag Tasks/Task Attempts to Vertex
@@ -301,6 +302,7 @@ INCOMPATIBLE CHANGES
TEZ-2949. Allow duplicate dag names within session for Tez.
ALL CHANGES
+ TEZ-3025. InputInitializer creation should use the dag ugi.
TEZ-3017. HistoryACLManager does not have a close method for cleanup
TEZ-2914. Ability to limit vertex concurrency
TEZ-2918. Make progress notifications in IOs
http://git-wip-us.apache.org/repos/asf/tez/blob/d5c9649e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
index 57a7172..e03b469 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
@@ -20,6 +20,7 @@ package org.apache.tez.dag.app.dag;
import javax.annotation.Nullable;
+import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.security.PrivilegedExceptionAction;
import java.util.Collection;
@@ -107,7 +108,7 @@ public class RootInputInitializerManager {
this.entityStateTracker = stateTracker;
}
- public void runInputInitializers(List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>
+ public void runInputInitializers(List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>
inputs) throws TezException {
for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input : inputs) {
@@ -141,12 +142,29 @@ public class RootInputInitializerManager {
}
@VisibleForTesting
- protected InputInitializer createInitializer(RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>
- input, InputInitializerContext context) throws TezException {
- InputInitializer initializer = ReflectionUtils
- .createClazzInstance(input.getControllerDescriptor().getClassName(),
- new Class[]{InputInitializerContext.class}, new Object[]{context});
- return initializer;
+ protected InputInitializer createInitializer(final RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>
+ input, final InputInitializerContext context) throws TezException {
+ try {
+ return dagUgi.doAs(new PrivilegedExceptionAction<InputInitializer>() {
+ @Override
+ public InputInitializer run() throws Exception {
+ InputInitializer initializer = ReflectionUtils
+ .createClazzInstance(input.getControllerDescriptor().getClassName(),
+ new Class[]{InputInitializerContext.class}, new Object[]{context});
+ return initializer;
+ }
+ });
+ } catch (IOException e) {
+ throw new TezException(e);
+ } catch (InterruptedException e) {
+ throw new TezException(e);
+ } catch (UndeclaredThrowableException e) {
+ if (e.getCause() instanceof TezException) {
+ throw (TezException) e.getCause();
+ } else {
+ throw e;
+ }
+ }
}
public void handleInitializerEvents(List<TezEvent> events) {
http://git-wip-us.apache.org/repos/asf/tez/blob/d5c9649e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java
index 89eb2a6..b79b4af 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java
@@ -25,19 +25,26 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.io.IOException;
+import java.util.Collections;
import java.util.List;
import com.google.common.collect.Lists;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.InputInitializerDescriptor;
import org.apache.tez.dag.api.RootInputLeafOutput;
+import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.oldrecords.TaskState;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.hadoop.shim.DefaultHadoopShim;
+import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.InputInitializer;
import org.apache.tez.runtime.api.InputInitializerContext;
import org.apache.tez.runtime.api.events.InputInitializerEvent;
@@ -198,4 +205,68 @@ public class TestRootInputInitializerManager {
verify(initializer, never()).handleInputInitializerEvent(any(List.class));
}
+
+
+ @Test (timeout = 5000)
+ public void testCorrectUgiUsage() throws TezException, InterruptedException {
+ Vertex vertex = mock(Vertex.class);
+ doReturn(mock(TezVertexID.class)).when(vertex).getVertexId();
+ AppContext appContext = mock(AppContext.class);
+ doReturn(new DefaultHadoopShim()).when(appContext).getHadoopShim();
+ doReturn(mock(EventHandler.class)).when(appContext).getEventHandler();
+ UserGroupInformation dagUgi = UserGroupInformation.createRemoteUser("fakeuser");
+ StateChangeNotifier stateChangeNotifier = mock(StateChangeNotifier.class);
+ RootInputInitializerManager rootInputInitializerManager = new RootInputInitializerManager(vertex, appContext, dagUgi, stateChangeNotifier);
+
+ InputDescriptor id = mock(InputDescriptor.class);
+ InputInitializerDescriptor iid = InputInitializerDescriptor.create(InputInitializerForUgiTest.class.getName());
+ RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> rootInput =
+ new RootInputLeafOutput<>("InputName", id, iid);
+ rootInputInitializerManager.runInputInitializers(Collections.singletonList(rootInput));
+
+ InputInitializerForUgiTest.awaitInitialize();
+
+ assertEquals(dagUgi, InputInitializerForUgiTest.ctorUgi);
+ assertEquals(dagUgi, InputInitializerForUgiTest.initializeUgi);
+ }
+
+ public static class InputInitializerForUgiTest extends InputInitializer {
+
+ static volatile UserGroupInformation ctorUgi;
+ static volatile UserGroupInformation initializeUgi;
+
+ static boolean initialized = false;
+ static final Object initializeSync = new Object();
+
+ public InputInitializerForUgiTest(InputInitializerContext initializerContext) {
+ super(initializerContext);
+ try {
+ ctorUgi = UserGroupInformation.getCurrentUser();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public List<Event> initialize() throws Exception {
+ initializeUgi = UserGroupInformation.getCurrentUser();
+ synchronized (initializeSync) {
+ initialized = true;
+ initializeSync.notify();
+ }
+ return null;
+ }
+
+ @Override
+ public void handleInputInitializerEvent(List<InputInitializerEvent> events) throws Exception {
+ }
+
+ static void awaitInitialize() throws InterruptedException {
+ synchronized (initializeSync) {
+ while (!initialized) {
+ initializeSync.wait();
+ }
+ }
+ }
+ }
}