You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by sr...@apache.org on 2016/01/20 18:05:35 UTC

[09/50] [abbrv] tez git commit: TEZ-3025. InputInitializer creation should use the dag ugi. (sseth)

TEZ-3025. InputInitializer creation should use the dag ugi. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d5c9649e
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d5c9649e
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d5c9649e

Branch: refs/heads/TEZ-2980
Commit: d5c9649e5849021dd1bfdb796d821f7a1524aaf0
Parents: 9816a49
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Jan 7 17:38:40 2016 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Jan 7 17:38:40 2016 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../app/dag/RootInputInitializerManager.java    | 32 +++++++--
 .../dag/TestRootInputInitializerManager.java    | 71 ++++++++++++++++++++
 3 files changed, 98 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/d5c9649e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ce35fd1..5e944b6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -11,6 +11,7 @@ INCOMPATIBLE CHANGES
   TEZ-2972. Avoid task rescheduling when a node turns unhealthy
 
 ALL CHANGES:
+  TEZ-3025. InputInitializer creation should use the dag ugi.
   TEZ-3017. HistoryACLManager does not have a close method for cleanup
   TEZ-2914. Ability to limit vertex concurrency
   TEZ-3011. Link Vertex Name in Dag Tasks/Task Attempts to Vertex
@@ -301,6 +302,7 @@ INCOMPATIBLE CHANGES
   TEZ-2949. Allow duplicate dag names within session for Tez.
 
 ALL CHANGES
+  TEZ-3025. InputInitializer creation should use the dag ugi.
   TEZ-3017. HistoryACLManager does not have a close method for cleanup
   TEZ-2914. Ability to limit vertex concurrency
   TEZ-2918. Make progress notifications in IOs

http://git-wip-us.apache.org/repos/asf/tez/blob/d5c9649e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
index 57a7172..e03b469 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
@@ -20,6 +20,7 @@ package org.apache.tez.dag.app.dag;
 
 import javax.annotation.Nullable;
 
+import java.io.IOException;
 import java.lang.reflect.UndeclaredThrowableException;
 import java.security.PrivilegedExceptionAction;
 import java.util.Collection;
@@ -107,7 +108,7 @@ public class RootInputInitializerManager {
     this.entityStateTracker = stateTracker;
   }
   
-  public void runInputInitializers(List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> 
+  public void runInputInitializers(List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>
       inputs) throws TezException {
     for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input : inputs) {
 
@@ -141,12 +142,29 @@ public class RootInputInitializerManager {
   }
 
   @VisibleForTesting
-  protected InputInitializer createInitializer(RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>
-      input, InputInitializerContext context) throws TezException {
-    InputInitializer initializer = ReflectionUtils
-        .createClazzInstance(input.getControllerDescriptor().getClassName(),
-            new Class[]{InputInitializerContext.class}, new Object[]{context});
-    return initializer;
+  protected InputInitializer createInitializer(final RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>
+      input, final InputInitializerContext context) throws TezException {
+    try {
+      return dagUgi.doAs(new PrivilegedExceptionAction<InputInitializer>() {
+        @Override
+        public InputInitializer run() throws Exception {
+          InputInitializer initializer = ReflectionUtils
+              .createClazzInstance(input.getControllerDescriptor().getClassName(),
+                  new Class[]{InputInitializerContext.class}, new Object[]{context});
+          return initializer;
+        }
+      });
+    } catch (IOException e) {
+      throw new TezException(e);
+    } catch (InterruptedException e) {
+      throw new TezException(e);
+    } catch (UndeclaredThrowableException e) {
+      if (e.getCause() instanceof TezException) {
+        throw (TezException) e.getCause();
+      } else {
+        throw e;
+      }
+    }
   }
 
   public void handleInitializerEvents(List<TezEvent> events) {

http://git-wip-us.apache.org/repos/asf/tez/blob/d5c9649e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java
index 89eb2a6..b79b4af 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java
@@ -25,19 +25,26 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 
 import com.google.common.collect.Lists;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.InputInitializerDescriptor;
 import org.apache.tez.dag.api.RootInputLeafOutput;
+import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.oldrecords.TaskState;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.hadoop.shim.DefaultHadoopShim;
+import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.InputInitializer;
 import org.apache.tez.runtime.api.InputInitializerContext;
 import org.apache.tez.runtime.api.events.InputInitializerEvent;
@@ -198,4 +205,68 @@ public class TestRootInputInitializerManager {
 
     verify(initializer, never()).handleInputInitializerEvent(any(List.class));
   }
+
+
+  @Test (timeout = 5000)
+  public void testCorrectUgiUsage() throws TezException, InterruptedException {
+    Vertex vertex = mock(Vertex.class);
+    doReturn(mock(TezVertexID.class)).when(vertex).getVertexId();
+    AppContext appContext = mock(AppContext.class);
+    doReturn(new DefaultHadoopShim()).when(appContext).getHadoopShim();
+    doReturn(mock(EventHandler.class)).when(appContext).getEventHandler();
+    UserGroupInformation dagUgi = UserGroupInformation.createRemoteUser("fakeuser");
+    StateChangeNotifier stateChangeNotifier = mock(StateChangeNotifier.class);
+    RootInputInitializerManager rootInputInitializerManager = new RootInputInitializerManager(vertex, appContext, dagUgi, stateChangeNotifier);
+
+    InputDescriptor id = mock(InputDescriptor.class);
+    InputInitializerDescriptor iid = InputInitializerDescriptor.create(InputInitializerForUgiTest.class.getName());
+    RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> rootInput =
+        new RootInputLeafOutput<>("InputName", id, iid);
+    rootInputInitializerManager.runInputInitializers(Collections.singletonList(rootInput));
+
+    InputInitializerForUgiTest.awaitInitialize();
+
+    assertEquals(dagUgi, InputInitializerForUgiTest.ctorUgi);
+    assertEquals(dagUgi, InputInitializerForUgiTest.initializeUgi);
+  }
+
+  public static class InputInitializerForUgiTest extends InputInitializer {
+
+    static volatile UserGroupInformation ctorUgi;
+    static volatile UserGroupInformation initializeUgi;
+
+    static boolean initialized = false;
+    static final Object initializeSync = new Object();
+
+    public InputInitializerForUgiTest(InputInitializerContext initializerContext) {
+      super(initializerContext);
+      try {
+        ctorUgi = UserGroupInformation.getCurrentUser();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    @Override
+    public List<Event> initialize() throws Exception {
+      initializeUgi = UserGroupInformation.getCurrentUser();
+      synchronized (initializeSync) {
+        initialized = true;
+        initializeSync.notify();
+      }
+      return null;
+    }
+
+    @Override
+    public void handleInputInitializerEvent(List<InputInitializerEvent> events) throws Exception {
+    }
+
+    static void awaitInitialize() throws InterruptedException {
+      synchronized (initializeSync) {
+        while (!initialized) {
+          initializeSync.wait();
+        }
+      }
+    }
+  }
 }