You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ss...@apache.org on 2015/07/23 07:27:04 UTC

hive git commit: HIVE-11350. Fix API usage in plugins to match changes made in TEZ-2005. (Siddharth Seth)

Repository: hive
Updated Branches:
  refs/heads/llap d701cd6de -> 5cd092b8b


HIVE-11350. Fix API usage in plugins to match changes made in TEZ-2005. (Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/5cd092b8
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/5cd092b8
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/5cd092b8

Branch: refs/heads/llap
Commit: 5cd092b8beba4a9bc866a8692e2265ff35ae9b2a
Parents: d701cd6
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Jul 22 22:26:32 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Jul 22 22:26:32 2015 -0700

----------------------------------------------------------------------
 .../apache/tez/dag/app/rm/ContainerFactory.java |  9 ++--
 .../dag/app/rm/LlapTaskSchedulerService.java    | 46 ++++++++----------
 .../app/rm/TestLlapTaskSchedulerService.java    | 51 +++++++++-----------
 3 files changed, 46 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/5cd092b8/llap-server/src/java/org/apache/tez/dag/app/rm/ContainerFactory.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/tez/dag/app/rm/ContainerFactory.java b/llap-server/src/java/org/apache/tez/dag/app/rm/ContainerFactory.java
index 1ab3d15..89b7198 100644
--- a/llap-server/src/java/org/apache/tez/dag/app/rm/ContainerFactory.java
+++ b/llap-server/src/java/org/apache/tez/dag/app/rm/ContainerFactory.java
@@ -23,20 +23,17 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.tez.dag.app.AppContext;
 
 class ContainerFactory {
   final ApplicationAttemptId customAppAttemptId;
   AtomicLong nextId;
 
-  public ContainerFactory(AppContext appContext, long appIdLong) {
+  public ContainerFactory(ApplicationAttemptId appAttemptId, long appIdLong) {
     this.nextId = new AtomicLong(1);
     ApplicationId appId =
-        ApplicationId.newInstance(appIdLong, appContext.getApplicationAttemptId()
-            .getApplicationId().getId());
+        ApplicationId.newInstance(appIdLong, appAttemptId.getApplicationId().getId());
     this.customAppAttemptId =
-        ApplicationAttemptId.newInstance(appId, appContext.getApplicationAttemptId()
-            .getAttemptId());
+        ApplicationAttemptId.newInstance(appId, appAttemptId.getAttemptId());
   }
 
   public Container createContainer(Resource capability, Priority priority, String hostname,

http://git-wip-us.apache.org/repos/asf/hive/blob/5cd092b8/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java b/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
index 17b17ee..b6ee3d8 100644
--- a/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
+++ b/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
@@ -60,7 +60,6 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.util.Clock;
-import org.apache.tez.dag.app.AppContext;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -68,16 +67,18 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
+import org.apache.tez.serviceplugins.api.TaskScheduler;
+import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class LlapTaskSchedulerService extends TaskSchedulerService {
+public class LlapTaskSchedulerService extends TaskScheduler {
 
   private static final Logger LOG = LoggerFactory.getLogger(LlapTaskSchedulerService.class);
 
-  private final ExecutorService appCallbackExecutor;
-  private final TaskSchedulerAppCallback appClientDelegate;
+  private final Configuration conf;
 
   // interface into the registry service
   private ServiceInstanceSet activeInstances;
@@ -150,16 +151,17 @@ public class LlapTaskSchedulerService extends TaskSchedulerService {
   @VisibleForTesting
   StatsPerDag dagStats = new StatsPerDag();
 
-  public LlapTaskSchedulerService(TaskSchedulerAppCallback appClient, AppContext appContext,
-      String clientHostname, int clientPort, String trackingUrl, long customAppIdIdentifier,
-      Configuration conf) {
-    // Accepting configuration here to allow setting up fields as final
+  public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext) {
+    this(taskSchedulerContext, new SystemClock());
+  }
 
-    super(LlapTaskSchedulerService.class.getName());
-    this.appCallbackExecutor = createAppCallbackExecutorService();
-    this.appClientDelegate = createAppCallbackDelegate(appClient);
-    this.clock = appContext.getClock();
-    this.containerFactory = new ContainerFactory(appContext, customAppIdIdentifier);
+  @VisibleForTesting
+  public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext, Clock clock) {
+    super(taskSchedulerContext);
+    this.clock = clock;
+    this.conf = taskSchedulerContext.getInitialConfiguration();
+    this.containerFactory = new ContainerFactory(taskSchedulerContext.getApplicationAttemptId(),
+        taskSchedulerContext.getCustomClusterIdentifier());
     this.memoryPerInstance =
         conf.getInt(LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB,
             LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB_DEFAULT);
@@ -206,12 +208,12 @@ public class LlapTaskSchedulerService extends TaskSchedulerService {
   }
 
   @Override
-  public void serviceInit(Configuration conf) {
+  public void initialize() {
     registry.init(conf);
   }
 
   @Override
-  public void serviceStart() throws IOException {
+  public void start() throws IOException {
     writeLock.lock();
     try {
       nodeEnablerFuture = nodeEnabledExecutor.submit(nodeEnablerCallable);
@@ -249,7 +251,7 @@ public class LlapTaskSchedulerService extends TaskSchedulerService {
   }
 
   @Override
-  public void serviceStop() {
+  public void shutdown() {
     writeLock.lock();
     try {
       if (!this.isStopped.getAndSet(true)) {
@@ -268,7 +270,6 @@ public class LlapTaskSchedulerService extends TaskSchedulerService {
         if (registry != null) {
           registry.stop();
         }
-        appCallbackExecutor.shutdownNow();
       }
     } finally {
       writeLock.unlock();
@@ -479,7 +480,7 @@ public class LlapTaskSchedulerService extends TaskSchedulerService {
     } finally {
       writeLock.unlock();
     }
-    appClientDelegate.containerBeingReleased(taskInfo.containerId);
+    getContext().containerBeingReleased(taskInfo.containerId);
     return true;
   }
 
@@ -507,11 +508,6 @@ public class LlapTaskSchedulerService extends TaskSchedulerService {
         .setNameFormat("TaskSchedulerAppCaller #%d").setDaemon(true).build());
   }
 
-  @VisibleForTesting
-  TaskSchedulerAppCallback createAppCallbackDelegate(TaskSchedulerAppCallback realAppClient) {
-    return new TaskSchedulerAppCallbackWrapper(realAppClient, appCallbackExecutor);
-  }
-
   /**
    * @param request the list of preferred hosts. null implies any host
    * @return
@@ -791,7 +787,7 @@ public class LlapTaskSchedulerService extends TaskSchedulerService {
         writeLock.unlock();
       }
 
-      appClientDelegate.taskAllocated(taskInfo.task, taskInfo.clientCookie, container);
+      getContext().taskAllocated(taskInfo.task, taskInfo.clientCookie, container);
       return true;
     }
   }
@@ -842,7 +838,7 @@ public class LlapTaskSchedulerService extends TaskSchedulerService {
     if (preemptedTaskList != null) {
       for (TaskInfo taskInfo : preemptedTaskList) {
         LOG.info("DBG: Preempting task {}", taskInfo);
-        appClientDelegate.preemptContainer(taskInfo.containerId);
+        getContext().preemptContainer(taskInfo.containerId);
       }
     }
     // The schedule loop will be triggered again when the deallocateTask request comes in for the

http://git-wip-us.apache.org/repos/asf/hive/blob/5cd092b8/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java b/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
index b1cd15e..245c140 100644
--- a/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
+++ b/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
@@ -24,6 +24,7 @@ import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
+import java.io.IOException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -40,11 +41,11 @@ import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
-import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ControlledClock;
-import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
+import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
@@ -57,7 +58,7 @@ public class TestLlapTaskSchedulerService {
   private static final String HOST3 = "host3";
 
   @Test (timeout = 5000)
-  public void testSimpleLocalAllocation() {
+  public void testSimpleLocalAllocation() throws IOException {
 
     TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper();
 
@@ -79,7 +80,7 @@ public class TestLlapTaskSchedulerService {
   }
 
   @Test (timeout = 5000)
-  public void testSimpleNoLocalityAllocation() {
+  public void testSimpleNoLocalityAllocation() throws IOException {
     TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper();
 
     try {
@@ -101,7 +102,7 @@ public class TestLlapTaskSchedulerService {
   // task triggers the next task to be scheduled.
 
   @Test(timeout=5000)
-  public void testPreemption() throws InterruptedException {
+  public void testPreemption() throws InterruptedException, IOException {
 
     Priority priority1 = Priority.newInstance(1);
     Priority priority2 = Priority.newInstance(2);
@@ -155,7 +156,7 @@ public class TestLlapTaskSchedulerService {
   }
 
   @Test(timeout=5000)
-  public void testNodeDisabled() {
+  public void testNodeDisabled() throws IOException {
     TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(10000l);
     try {
       Priority priority1 = Priority.newInstance(1);
@@ -200,7 +201,7 @@ public class TestLlapTaskSchedulerService {
   }
 
   // Flaky test disabled @Test(timeout=5000)
-  public void testNodeReEnabled() throws InterruptedException {
+  public void testNodeReEnabled() throws InterruptedException, IOException {
     // Based on actual timing.
     TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(1000l);
     try {
@@ -274,22 +275,22 @@ public class TestLlapTaskSchedulerService {
   private static class TestTaskSchedulerServiceWrapper {
     static final Resource resource = Resource.newInstance(1024, 1);
     Configuration conf;
-    TaskSchedulerAppCallback mockAppCallback = mock(TaskSchedulerAppCallback.class);
-    AppContext mockAppContext = mock(AppContext.class);
+    TaskSchedulerContext mockAppCallback = mock(TaskSchedulerContext.class);
     ControlledClock clock = new ControlledClock(new SystemClock());
     ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(1000, 1), 1);
     LlapTaskSchedulerServiceForTest ts;
 
-    TestTaskSchedulerServiceWrapper() {
+    TestTaskSchedulerServiceWrapper() throws IOException {
       this(2000l);
     }
 
-    TestTaskSchedulerServiceWrapper(long disableTimeoutMillis) {
+    TestTaskSchedulerServiceWrapper(long disableTimeoutMillis) throws IOException {
       this(disableTimeoutMillis, new String[]{HOST1, HOST2, HOST3}, 4,
           LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE_DEFAULT);
     }
 
-    TestTaskSchedulerServiceWrapper(long disableTimeoutMillis, String[] hosts, int numExecutors, int waitQueueSize) {
+    TestTaskSchedulerServiceWrapper(long disableTimeoutMillis, String[] hosts, int numExecutors, int waitQueueSize) throws
+        IOException {
       conf = new Configuration();
       conf.setStrings(LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS, hosts);
       conf.setInt(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS, numExecutors);
@@ -298,12 +299,13 @@ public class TestLlapTaskSchedulerService {
           disableTimeoutMillis);
       conf.setBoolean(LlapTaskSchedulerServiceForTest.LLAP_TASK_SCHEDULER_IN_TEST, true);
 
-      doReturn(clock).when(mockAppContext).getClock();
-      doReturn(appAttemptId).when(mockAppContext).getApplicationAttemptId();
+      doReturn(appAttemptId).when(mockAppCallback).getApplicationAttemptId();
+      doReturn(11111l).when(mockAppCallback).getCustomClusterIdentifier();
+      doReturn(conf).when(mockAppCallback).getInitialConfiguration();
 
-      ts = new LlapTaskSchedulerServiceForTest(mockAppCallback, mockAppContext, null, 0, null, 11111, conf);
+      ts = new LlapTaskSchedulerServiceForTest(mockAppCallback, clock);
 
-      ts.init(conf);
+      ts.initialize();
       ts.start();
       // One shceduler pass from the nodes that are added at startup
       awaitSchedulerRunNumber(1);
@@ -329,7 +331,7 @@ public class TestLlapTaskSchedulerService {
     }
 
     void shutdown() {
-      ts.stop();
+      ts.shutdown();
     }
 
     void allocateTask(Object task, String[] hosts, Priority priority, Object clientCookie) {
@@ -358,18 +360,9 @@ public class TestLlapTaskSchedulerService {
     private boolean forTestSchedulerGoSignal = false;
 
     public LlapTaskSchedulerServiceForTest(
-        TaskSchedulerAppCallback appClient, AppContext appContext, String clientHostname,
-        int clientPort, String trackingUrl, long customAppIdIdentifier,
-        Configuration conf) {
-      super(appClient, appContext, clientHostname, clientPort, trackingUrl, customAppIdIdentifier,
-          conf);
-      this.inTest = conf.getBoolean(LLAP_TASK_SCHEDULER_IN_TEST, false);
-    }
-
-    @Override
-    TaskSchedulerAppCallback createAppCallbackDelegate(
-        TaskSchedulerAppCallback realAppClient) {
-      return realAppClient;
+        TaskSchedulerContext appClient, Clock clock) {
+      super(appClient, clock);
+      this.inTest = appClient.getInitialConfiguration().getBoolean(LLAP_TASK_SCHEDULER_IN_TEST, false);
     }
 
     protected void schedulePendingTasks() {