You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ss...@apache.org on 2015/07/23 07:27:04 UTC
hive git commit: HIVE-11350. Fix API usage in plugins to match
changes made in TEZ-2005. (Siddharth Seth)
Repository: hive
Updated Branches:
refs/heads/llap d701cd6de -> 5cd092b8b
HIVE-11350. Fix API usage in plugins to match changes made in TEZ-2005. (Siddharth Seth)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/5cd092b8
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/5cd092b8
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/5cd092b8
Branch: refs/heads/llap
Commit: 5cd092b8beba4a9bc866a8692e2265ff35ae9b2a
Parents: d701cd6
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Jul 22 22:26:32 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Jul 22 22:26:32 2015 -0700
----------------------------------------------------------------------
.../apache/tez/dag/app/rm/ContainerFactory.java | 9 ++--
.../dag/app/rm/LlapTaskSchedulerService.java | 46 ++++++++----------
.../app/rm/TestLlapTaskSchedulerService.java | 51 +++++++++-----------
3 files changed, 46 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/5cd092b8/llap-server/src/java/org/apache/tez/dag/app/rm/ContainerFactory.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/tez/dag/app/rm/ContainerFactory.java b/llap-server/src/java/org/apache/tez/dag/app/rm/ContainerFactory.java
index 1ab3d15..89b7198 100644
--- a/llap-server/src/java/org/apache/tez/dag/app/rm/ContainerFactory.java
+++ b/llap-server/src/java/org/apache/tez/dag/app/rm/ContainerFactory.java
@@ -23,20 +23,17 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.tez.dag.app.AppContext;
class ContainerFactory {
final ApplicationAttemptId customAppAttemptId;
AtomicLong nextId;
- public ContainerFactory(AppContext appContext, long appIdLong) {
+ public ContainerFactory(ApplicationAttemptId appAttemptId, long appIdLong) {
this.nextId = new AtomicLong(1);
ApplicationId appId =
- ApplicationId.newInstance(appIdLong, appContext.getApplicationAttemptId()
- .getApplicationId().getId());
+ ApplicationId.newInstance(appIdLong, appAttemptId.getApplicationId().getId());
this.customAppAttemptId =
- ApplicationAttemptId.newInstance(appId, appContext.getApplicationAttemptId()
- .getAttemptId());
+ ApplicationAttemptId.newInstance(appId, appAttemptId.getAttemptId());
}
public Container createContainer(Resource capability, Priority priority, String hostname,
http://git-wip-us.apache.org/repos/asf/hive/blob/5cd092b8/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java b/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
index 17b17ee..b6ee3d8 100644
--- a/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
+++ b/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
@@ -60,7 +60,6 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.Clock;
-import org.apache.tez.dag.app.AppContext;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@@ -68,16 +67,18 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
+import org.apache.tez.serviceplugins.api.TaskScheduler;
+import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class LlapTaskSchedulerService extends TaskSchedulerService {
+public class LlapTaskSchedulerService extends TaskScheduler {
private static final Logger LOG = LoggerFactory.getLogger(LlapTaskSchedulerService.class);
- private final ExecutorService appCallbackExecutor;
- private final TaskSchedulerAppCallback appClientDelegate;
+ private final Configuration conf;
// interface into the registry service
private ServiceInstanceSet activeInstances;
@@ -150,16 +151,17 @@ public class LlapTaskSchedulerService extends TaskSchedulerService {
@VisibleForTesting
StatsPerDag dagStats = new StatsPerDag();
- public LlapTaskSchedulerService(TaskSchedulerAppCallback appClient, AppContext appContext,
- String clientHostname, int clientPort, String trackingUrl, long customAppIdIdentifier,
- Configuration conf) {
- // Accepting configuration here to allow setting up fields as final
+ public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext) {
+ this(taskSchedulerContext, new SystemClock());
+ }
- super(LlapTaskSchedulerService.class.getName());
- this.appCallbackExecutor = createAppCallbackExecutorService();
- this.appClientDelegate = createAppCallbackDelegate(appClient);
- this.clock = appContext.getClock();
- this.containerFactory = new ContainerFactory(appContext, customAppIdIdentifier);
+ @VisibleForTesting
+ public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext, Clock clock) {
+ super(taskSchedulerContext);
+ this.clock = clock;
+ this.conf = taskSchedulerContext.getInitialConfiguration();
+ this.containerFactory = new ContainerFactory(taskSchedulerContext.getApplicationAttemptId(),
+ taskSchedulerContext.getCustomClusterIdentifier());
this.memoryPerInstance =
conf.getInt(LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB,
LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB_DEFAULT);
@@ -206,12 +208,12 @@ public class LlapTaskSchedulerService extends TaskSchedulerService {
}
@Override
- public void serviceInit(Configuration conf) {
+ public void initialize() {
registry.init(conf);
}
@Override
- public void serviceStart() throws IOException {
+ public void start() throws IOException {
writeLock.lock();
try {
nodeEnablerFuture = nodeEnabledExecutor.submit(nodeEnablerCallable);
@@ -249,7 +251,7 @@ public class LlapTaskSchedulerService extends TaskSchedulerService {
}
@Override
- public void serviceStop() {
+ public void shutdown() {
writeLock.lock();
try {
if (!this.isStopped.getAndSet(true)) {
@@ -268,7 +270,6 @@ public class LlapTaskSchedulerService extends TaskSchedulerService {
if (registry != null) {
registry.stop();
}
- appCallbackExecutor.shutdownNow();
}
} finally {
writeLock.unlock();
@@ -479,7 +480,7 @@ public class LlapTaskSchedulerService extends TaskSchedulerService {
} finally {
writeLock.unlock();
}
- appClientDelegate.containerBeingReleased(taskInfo.containerId);
+ getContext().containerBeingReleased(taskInfo.containerId);
return true;
}
@@ -507,11 +508,6 @@ public class LlapTaskSchedulerService extends TaskSchedulerService {
.setNameFormat("TaskSchedulerAppCaller #%d").setDaemon(true).build());
}
- @VisibleForTesting
- TaskSchedulerAppCallback createAppCallbackDelegate(TaskSchedulerAppCallback realAppClient) {
- return new TaskSchedulerAppCallbackWrapper(realAppClient, appCallbackExecutor);
- }
-
/**
* @param request the list of preferred hosts. null implies any host
* @return
@@ -791,7 +787,7 @@ public class LlapTaskSchedulerService extends TaskSchedulerService {
writeLock.unlock();
}
- appClientDelegate.taskAllocated(taskInfo.task, taskInfo.clientCookie, container);
+ getContext().taskAllocated(taskInfo.task, taskInfo.clientCookie, container);
return true;
}
}
@@ -842,7 +838,7 @@ public class LlapTaskSchedulerService extends TaskSchedulerService {
if (preemptedTaskList != null) {
for (TaskInfo taskInfo : preemptedTaskList) {
LOG.info("DBG: Preempting task {}", taskInfo);
- appClientDelegate.preemptContainer(taskInfo.containerId);
+ getContext().preemptContainer(taskInfo.containerId);
}
}
// The schedule loop will be triggered again when the deallocateTask request comes in for the
http://git-wip-us.apache.org/repos/asf/hive/blob/5cd092b8/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java b/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
index b1cd15e..245c140 100644
--- a/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
+++ b/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
@@ -24,6 +24,7 @@ import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -40,11 +41,11 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
-import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ControlledClock;
-import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
+import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
@@ -57,7 +58,7 @@ public class TestLlapTaskSchedulerService {
private static final String HOST3 = "host3";
@Test (timeout = 5000)
- public void testSimpleLocalAllocation() {
+ public void testSimpleLocalAllocation() throws IOException {
TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper();
@@ -79,7 +80,7 @@ public class TestLlapTaskSchedulerService {
}
@Test (timeout = 5000)
- public void testSimpleNoLocalityAllocation() {
+ public void testSimpleNoLocalityAllocation() throws IOException {
TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper();
try {
@@ -101,7 +102,7 @@ public class TestLlapTaskSchedulerService {
// task triggers the next task to be scheduled.
@Test(timeout=5000)
- public void testPreemption() throws InterruptedException {
+ public void testPreemption() throws InterruptedException, IOException {
Priority priority1 = Priority.newInstance(1);
Priority priority2 = Priority.newInstance(2);
@@ -155,7 +156,7 @@ public class TestLlapTaskSchedulerService {
}
@Test(timeout=5000)
- public void testNodeDisabled() {
+ public void testNodeDisabled() throws IOException {
TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(10000l);
try {
Priority priority1 = Priority.newInstance(1);
@@ -200,7 +201,7 @@ public class TestLlapTaskSchedulerService {
}
// Flaky test disabled @Test(timeout=5000)
- public void testNodeReEnabled() throws InterruptedException {
+ public void testNodeReEnabled() throws InterruptedException, IOException {
// Based on actual timing.
TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(1000l);
try {
@@ -274,22 +275,22 @@ public class TestLlapTaskSchedulerService {
private static class TestTaskSchedulerServiceWrapper {
static final Resource resource = Resource.newInstance(1024, 1);
Configuration conf;
- TaskSchedulerAppCallback mockAppCallback = mock(TaskSchedulerAppCallback.class);
- AppContext mockAppContext = mock(AppContext.class);
+ TaskSchedulerContext mockAppCallback = mock(TaskSchedulerContext.class);
ControlledClock clock = new ControlledClock(new SystemClock());
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(1000, 1), 1);
LlapTaskSchedulerServiceForTest ts;
- TestTaskSchedulerServiceWrapper() {
+ TestTaskSchedulerServiceWrapper() throws IOException {
this(2000l);
}
- TestTaskSchedulerServiceWrapper(long disableTimeoutMillis) {
+ TestTaskSchedulerServiceWrapper(long disableTimeoutMillis) throws IOException {
this(disableTimeoutMillis, new String[]{HOST1, HOST2, HOST3}, 4,
LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE_DEFAULT);
}
- TestTaskSchedulerServiceWrapper(long disableTimeoutMillis, String[] hosts, int numExecutors, int waitQueueSize) {
+ TestTaskSchedulerServiceWrapper(long disableTimeoutMillis, String[] hosts, int numExecutors, int waitQueueSize) throws
+ IOException {
conf = new Configuration();
conf.setStrings(LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS, hosts);
conf.setInt(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS, numExecutors);
@@ -298,12 +299,13 @@ public class TestLlapTaskSchedulerService {
disableTimeoutMillis);
conf.setBoolean(LlapTaskSchedulerServiceForTest.LLAP_TASK_SCHEDULER_IN_TEST, true);
- doReturn(clock).when(mockAppContext).getClock();
- doReturn(appAttemptId).when(mockAppContext).getApplicationAttemptId();
+ doReturn(appAttemptId).when(mockAppCallback).getApplicationAttemptId();
+ doReturn(11111l).when(mockAppCallback).getCustomClusterIdentifier();
+ doReturn(conf).when(mockAppCallback).getInitialConfiguration();
- ts = new LlapTaskSchedulerServiceForTest(mockAppCallback, mockAppContext, null, 0, null, 11111, conf);
+ ts = new LlapTaskSchedulerServiceForTest(mockAppCallback, clock);
- ts.init(conf);
+ ts.initialize();
ts.start();
// One shceduler pass from the nodes that are added at startup
awaitSchedulerRunNumber(1);
@@ -329,7 +331,7 @@ public class TestLlapTaskSchedulerService {
}
void shutdown() {
- ts.stop();
+ ts.shutdown();
}
void allocateTask(Object task, String[] hosts, Priority priority, Object clientCookie) {
@@ -358,18 +360,9 @@ public class TestLlapTaskSchedulerService {
private boolean forTestSchedulerGoSignal = false;
public LlapTaskSchedulerServiceForTest(
- TaskSchedulerAppCallback appClient, AppContext appContext, String clientHostname,
- int clientPort, String trackingUrl, long customAppIdIdentifier,
- Configuration conf) {
- super(appClient, appContext, clientHostname, clientPort, trackingUrl, customAppIdIdentifier,
- conf);
- this.inTest = conf.getBoolean(LLAP_TASK_SCHEDULER_IN_TEST, false);
- }
-
- @Override
- TaskSchedulerAppCallback createAppCallbackDelegate(
- TaskSchedulerAppCallback realAppClient) {
- return realAppClient;
+ TaskSchedulerContext appClient, Clock clock) {
+ super(appClient, clock);
+ this.inTest = appClient.getInitialConfiguration().getBoolean(LLAP_TASK_SCHEDULER_IN_TEST, false);
}
protected void schedulePendingTasks() {