You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/08 20:08:32 UTC
tez git commit: TEZ-2426. Ensure the eventRouter thread completes
before switching to a new task and thread safety fixes in IPOContexts.
(sseth)
Repository: tez
Updated Branches:
refs/heads/master 6e6ad706f -> ce69aa1e2
TEZ-2426. Ensure the eventRouter thread completes before switching to a
new task and thread safety fixes in IPOContexts. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ce69aa1e
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ce69aa1e
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ce69aa1e
Branch: refs/heads/master
Commit: ce69aa1e2ca3320d33c833a96a158f94bfd73f52
Parents: 6e6ad70
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri May 8 11:08:14 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri May 8 11:08:14 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../runtime/LogicalIOProcessorRuntimeTask.java | 110 +++++++++++++------
.../runtime/api/impl/TezInputContextImpl.java | 11 +-
.../runtime/api/impl/TezOutputContextImpl.java | 2 +-
.../api/impl/TezProcessorContextImpl.java | 4 +-
.../runtime/api/impl/TezTaskContextImpl.java | 9 +-
.../apache/tez/runtime/task/TaskReporter.java | 47 +++++---
.../TestLogicalIOProcessorRuntimeTask.java | 48 +++++---
8 files changed, 160 insertions(+), 72 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/ce69aa1e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 185e1b0..efb19b2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -20,6 +20,7 @@ INCOMPATIBLE CHANGES
Default max limit increased. Should not affect existing users.
ALL CHANGES:
+ TEZ-2426. Ensure the eventRouter thread completes before switching to a new task and thread safety fixes in IPOContexts.
TEZ-2412. Should kill vertex in DAGImpl#VertexRerunWhileCommitting
TEZ-2410. VertexGroupCommitFinishedEvent & VertexCommitStartedEvent is not logged correctly
TEZ-776. Reduce AM mem usage caused by storing TezEvents
http://git-wip-us.apache.org/repos/asf/tez/blob/ce69aa1e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index f465d3c..1cfe538 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -30,6 +30,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
@@ -96,45 +97,46 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
private static final Logger LOG = LoggerFactory
.getLogger(LogicalIOProcessorRuntimeTask.class);
+ @VisibleForTesting // All fields non private for testing.
private final String[] localDirs;
/** Responsible for maintaining order of Inputs */
- private final List<InputSpec> inputSpecs;
- private final Map<String, LogicalInput> inputsMap;
- private final Map<String, InputContext> inputContextMap;
+ final List<InputSpec> inputSpecs;
+ final ConcurrentMap<String, LogicalInput> inputsMap;
+ final ConcurrentMap<String, InputContext> inputContextMap;
/** Responsible for maintaining order of Outputs */
- private final List<OutputSpec> outputSpecs;
- private final Map<String, LogicalOutput> outputsMap;
- private final Map<String, OutputContext> outputContextMap;
+ final List<OutputSpec> outputSpecs;
+ final ConcurrentMap<String, LogicalOutput> outputsMap;
+ final ConcurrentMap<String, OutputContext> outputContextMap;
- private final List<GroupInputSpec> groupInputSpecs;
- private ConcurrentHashMap<String, MergedLogicalInput> groupInputsMap;
+ final List<GroupInputSpec> groupInputSpecs;
+ ConcurrentHashMap<String, MergedLogicalInput> groupInputsMap;
- private final ProcessorDescriptor processorDescriptor;
- private AbstractLogicalIOProcessor processor;
- private ProcessorContext processorContext;
+ final ProcessorDescriptor processorDescriptor;
+ AbstractLogicalIOProcessor processor;
+ ProcessorContext processorContext;
private final MemoryDistributor initialMemoryDistributor;
/** Maps which will be provided to the processor run method */
- private final LinkedHashMap<String, LogicalInput> runInputMap;
- private final LinkedHashMap<String, LogicalOutput> runOutputMap;
+ final LinkedHashMap<String, LogicalInput> runInputMap;
+ final LinkedHashMap<String, LogicalOutput> runOutputMap;
private final Map<String, ByteBuffer> serviceConsumerMetadata;
private final Map<String, String> envMap;
- private final ExecutorService initializerExecutor;
+ final ExecutorService initializerExecutor;
private final CompletionService<Void> initializerCompletionService;
private final Multimap<String, String> startedInputsMap;
- private LinkedBlockingQueue<TezEvent> eventsToBeProcessed;
- private Thread eventRouterThread = null;
+ LinkedBlockingQueue<TezEvent> eventsToBeProcessed;
+ Thread eventRouterThread = null;
private final int appAttemptNumber;
- private final InputReadyTracker inputReadyTracker;
+ private volatile InputReadyTracker inputReadyTracker;
- private final ObjectRegistry objectRegistry;
+ private volatile ObjectRegistry objectRegistry;
private final ExecutionContext ExecutionContext;
private final long memAvailable;
@@ -143,6 +145,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
Map<String, ByteBuffer> serviceConsumerMetadata, Map<String, String> envMap,
Multimap<String, String> startedInputsMap, ObjectRegistry objectRegistry,
String pid, ExecutionContext ExecutionContext, long memAvailable) throws IOException {
+ // Note: If adding any fields here, make sure they're cleaned up in the cleanupContext method.
// TODO Remove jobToken from here post TEZ-421
super(taskSpec, tezConf, tezUmbilical, pid);
LOG.info("Initializing LogicalIOProcessorRuntimeTask with TaskSpec: "
@@ -361,6 +364,14 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
setTaskDone();
if (eventRouterThread != null) {
eventRouterThread.interrupt();
+ LOG.info("Joining on EventRouter");
+ try {
+ eventRouterThread.join();
+ } catch (InterruptedException e) {
+ LOG.info("Ignoring interrupt while waiting for the router thread to die");
+ Thread.currentThread().interrupt();
+ }
+ eventRouterThread = null;
}
}
}
@@ -694,14 +705,6 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
eventRouterThread.start();
}
- private void cleanupInputOutputs() {
- if (groupInputsMap != null) {
- groupInputsMap.clear();
- }
- inputsMap.clear();
- outputsMap.clear();
- }
-
private void closeContexts() throws IOException {
closeContext(inputContextMap);
closeContext(outputContextMap);
@@ -725,19 +728,62 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
}
}
- public synchronized void cleanup() {
+ public void cleanup() throws InterruptedException {
+ LOG.info("Final Counters : " + getCounters().toShortString());
+ setTaskDone();
+ if (eventRouterThread != null) {
+ eventRouterThread.interrupt();
+ LOG.info("Joining on EventRouter");
+ try {
+ eventRouterThread.join();
+ } catch (InterruptedException e) {
+ LOG.info("Ignoring interrupt while waiting for the router thread to die");
+ Thread.currentThread().interrupt();
+ }
+ eventRouterThread = null;
+ }
try {
- cleanupInputOutputs();
closeContexts();
+ // Cleanup references which may be held by misbehaved tasks.
+ cleanupStructures();
} catch (IOException e) {
LOG.info("Error while cleaning up contexts ", e);
}
+ }
- LOG.info("Final Counters : " + getCounters().toShortString());
- setTaskDone();
- if (eventRouterThread != null) {
- eventRouterThread.interrupt();
+ private void cleanupStructures() {
+ if (initializerExecutor != null && !initializerExecutor.isShutdown()) {
+ initializerExecutor.shutdownNow();
+ }
+ inputsMap.clear();
+ outputsMap.clear();
+
+ inputSpecs.clear();
+ outputSpecs.clear();
+
+ inputsMap.clear();
+ outputsMap.clear();
+
+ inputContextMap.clear();
+ outputContextMap.clear();
+
+ if (groupInputSpecs != null) {
+ groupInputSpecs.clear();
}
+ if (groupInputsMap != null) {
+ groupInputsMap.clear();
+ groupInputsMap = null;
+ }
+
+ processor = null;
+ processorContext = null;
+
+ runInputMap.clear();
+ runOutputMap.clear();
+
+ eventsToBeProcessed.clear();
+ inputReadyTracker = null;
+ objectRegistry = null;
}
@Private
http://git-wip-us.apache.org/repos/asf/tez/blob/ce69aa1e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
index 101aeb9..8d6466a 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
@@ -55,12 +55,12 @@ public class TezInputContextImpl extends TezTaskContextImpl
private static final Logger LOG = LoggerFactory.getLogger(TezInputContextImpl.class);
- private UserPayload userPayload;
+ private volatile UserPayload userPayload;
private final String sourceVertexName;
private final EventMetaData sourceInfo;
private final int inputIndex;
private final Map<String, LogicalInput> inputs;
- private InputReadyTracker inputReadyTracker;
+ private volatile InputReadyTracker inputReadyTracker;
private final InputStatisticsReporterImpl statsReporter;
class InputStatisticsReporterImpl implements InputStatisticsReporter {
@@ -159,7 +159,11 @@ public class TezInputContextImpl extends TezTaskContextImpl
@Override
public void inputIsReady() {
- inputReadyTracker.setInputIsReady(inputs.get(sourceVertexName));
+ if (inputReadyTracker != null) {
+ inputReadyTracker.setInputIsReady(inputs.get(sourceVertexName));
+ } else {
+ LOG.warn("Ignoring Input Ready notification since the Task has already been closed");
+ }
}
@Override
@@ -172,7 +176,6 @@ public class TezInputContextImpl extends TezTaskContextImpl
super.close();
this.userPayload = null;
this.inputReadyTracker = null;
- inputs.clear();
LOG.info("Cleared TezInputContextImpl related information");
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/ce69aa1e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
index b46cfd2..71e96db 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
@@ -53,7 +53,7 @@ public class TezOutputContextImpl extends TezTaskContextImpl
private static final Logger LOG = LoggerFactory.getLogger(TezOutputContextImpl.class);
- private UserPayload userPayload;
+ private volatile UserPayload userPayload;
private final String destinationVertexName;
private final EventMetaData sourceInfo;
private final int outputIndex;
http://git-wip-us.apache.org/repos/asf/tez/blob/ce69aa1e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
index d6b3ec5..a191ae8 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
@@ -53,8 +53,8 @@ public class TezProcessorContextImpl extends TezTaskContextImpl implements Proce
private static final Logger LOG = LoggerFactory.getLogger(TezProcessorContextImpl.class);
- private UserPayload userPayload;
- private InputReadyTracker inputReadyTracker;
+ private volatile UserPayload userPayload;
+ private volatile InputReadyTracker inputReadyTracker;
private final EventMetaData sourceInfo;
public TezProcessorContextImpl(Configuration conf, String[] workDirs, int appAttemptNumber,
http://git-wip-us.apache.org/repos/asf/tez/blob/ce69aa1e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
index 170741a..5f04c80 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
@@ -54,15 +54,15 @@ public abstract class TezTaskContextImpl implements TaskContext, Closeable {
private final TezCounters counters;
private String[] workDirs;
private String uniqueIdentifier;
- protected LogicalIOProcessorRuntimeTask runtimeTask;
+ protected final LogicalIOProcessorRuntimeTask runtimeTask;
protected final TezUmbilical tezUmbilical;
private final Map<String, ByteBuffer> serviceConsumerMetadata;
private final int appAttemptNumber;
private final Map<String, String> auxServiceEnv;
- protected MemoryDistributor initialMemoryDistributor;
+ protected volatile MemoryDistributor initialMemoryDistributor;
protected final EntityDescriptor<?> descriptor;
private final String dagName;
- private ObjectRegistry objectRegistry;
+ private volatile ObjectRegistry objectRegistry;
private final int vertexParallelism;
private final ExecutionContext ExecutionContext;
private final long memAvailable;
@@ -225,7 +225,8 @@ public abstract class TezTaskContextImpl implements TaskContext, Closeable {
@Override
public void close() throws IOException {
- this.runtimeTask = null;
+ Preconditions.checkState(runtimeTask.isTaskDone(),
+ "Runtime task must be complete before calling cleanup");
this.objectRegistry = null;
this.initialMemoryDistributor = null;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/ce69aa1e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
index 3d1d1a2..8b9db16 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
@@ -112,6 +113,7 @@ public class TaskReporter {
public synchronized void unregisterTask(TezTaskAttemptID taskAttemptID) {
currentCallable.markComplete();
currentCallable = null;
+ // KKK Make sure the callable completes before proceeding
}
public void shutdown() {
@@ -125,7 +127,7 @@ public class TaskReporter {
private static final float LOG_COUNTER_BACKOFF = 1.3f;
private final RuntimeTask task;
- private EventMetaData updateEventMetadata;
+ private final EventMetaData updateEventMetadata;
private final TezTaskUmbilicalProtocol umbilical;
@@ -136,6 +138,9 @@ public class TaskReporter {
private final AtomicLong requestCounter;
+ private final AtomicBoolean finalEventQueued = new AtomicBoolean(false);
+ private final AtomicBoolean askedToDie = new AtomicBoolean(false);
+
private LinkedBlockingQueue<TezEvent> eventsToSend = new LinkedBlockingQueue<TezEvent>();
private final ReentrantLock lock = new ReentrantLock();
@@ -199,6 +204,9 @@ public class TaskReporter {
}
int pendingEventCount = eventsToSend.size();
if (pendingEventCount > 0) {
+ // This is OK because the pending events will be sent via the succeeded/failed messages.
+ // TaskDone is set before taskSucceeded / taskFailed are sent out - which is what causes the
+ // thread to exit.
LOG.warn("Exiting TaskReporter thread with pending queue size=" + pendingEventCount);
}
return true;
@@ -256,6 +264,7 @@ public class TaskReporter {
if (response.shouldDie()) {
LOG.info("Received should die response from AM");
+ askedToDie.set(true);
return new ResponseWrapper(true, 1);
}
if (response.getLastRequestId() != requestId) {
@@ -268,7 +277,7 @@ public class TaskReporter {
int numEventsReceived = 0;
if (task.isTaskDone() || task.hadFatalError()) {
if (response.getEvents() != null && !response.getEvents().isEmpty()) {
- LOG.warn("Current task already complete, Ignoring all event in"
+ LOG.info("Current task already complete, Ignoring all event in"
+ " heartbeat response, eventCount=" + response.getEvents().size());
}
} else {
@@ -315,10 +324,16 @@ public class TaskReporter {
* indicates an exception somewhere in the AM.
*/
private boolean taskSucceeded(TezTaskAttemptID taskAttemptID) throws IOException, TezException {
- TezEvent statusUpdateEvent = new TezEvent(getStatusUpdateEvent(true), updateEventMetadata);
- TezEvent taskCompletedEvent = new TezEvent(new TaskAttemptCompletedEvent(),
- updateEventMetadata);
- return !heartbeat(Lists.newArrayList(statusUpdateEvent, taskCompletedEvent)).shouldDie;
+ // Ensure only one final event is ever sent.
+ if (!finalEventQueued.getAndSet(true)) {
+ TezEvent statusUpdateEvent = new TezEvent(getStatusUpdateEvent(true), updateEventMetadata);
+ TezEvent taskCompletedEvent = new TezEvent(new TaskAttemptCompletedEvent(),
+ updateEventMetadata);
+ return !heartbeat(Lists.newArrayList(statusUpdateEvent, taskCompletedEvent)).shouldDie;
+ } else {
+ LOG.warn("A final task state event has already been sent. Not sending again");
+ return askedToDie.get();
+ }
}
@VisibleForTesting
@@ -351,15 +366,21 @@ public class TaskReporter {
*/
private boolean taskFailed(TezTaskAttemptID taskAttemptID, Throwable t, String diagnostics,
EventMetaData srcMeta) throws IOException, TezException {
- TezEvent statusUpdateEvent = new TezEvent(getStatusUpdateEvent(true), updateEventMetadata);
- if (diagnostics == null) {
- diagnostics = ExceptionUtils.getStackTrace(t);
+ // Ensure only one final event is ever sent.
+ if (!finalEventQueued.getAndSet(true)) {
+ TezEvent statusUpdateEvent = new TezEvent(getStatusUpdateEvent(true), updateEventMetadata);
+ if (diagnostics == null) {
+ diagnostics = ExceptionUtils.getStackTrace(t);
+ } else {
+ diagnostics = diagnostics + ":" + ExceptionUtils.getStackTrace(t);
+ }
+ TezEvent taskAttemptFailedEvent = new TezEvent(new TaskAttemptFailedEvent(diagnostics),
+ srcMeta == null ? updateEventMetadata : srcMeta);
+ return !heartbeat(Lists.newArrayList(statusUpdateEvent, taskAttemptFailedEvent)).shouldDie;
} else {
- diagnostics = diagnostics + ":" + ExceptionUtils.getStackTrace(t);
+ LOG.warn("A final task state event has already been sent. Not sending again");
+ return askedToDie.get();
}
- TezEvent taskAttemptFailedEvent = new TezEvent(new TaskAttemptFailedEvent(diagnostics),
- srcMeta == null ? updateEventMetadata : srcMeta);
- return !heartbeat(Lists.newArrayList(statusUpdateEvent, taskAttemptFailedEvent)).shouldDie;
}
private void addEvents(TezTaskAttemptID taskAttemptID, Collection<TezEvent> events) {
http://git-wip-us.apache.org/repos/asf/tez/blob/ce69aa1e/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
index df932cf..b337bc7 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
@@ -19,6 +19,7 @@
package org.apache.tez.runtime;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
@@ -26,6 +27,7 @@ import static org.mockito.Mockito.mock;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -33,6 +35,7 @@ import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
@@ -133,29 +136,43 @@ public class TestLogicalIOProcessorRuntimeTask {
}
- private void cleanupAndTest(LogicalIOProcessorRuntimeTask lio) {
+ private void cleanupAndTest(LogicalIOProcessorRuntimeTask lio) throws InterruptedException {
+
+ ProcessorContext procContext = lio.getProcessorContext();
+ List<InputContext> inputContexts = new LinkedList<InputContext>();
+ inputContexts.addAll(lio.getInputContexts());
+ List<OutputContext> outputContexts = new LinkedList<OutputContext>();
+ outputContexts.addAll(lio.getOutputContexts());
lio.cleanup();
- assertTrue(lio.getProcessorContext().getUserPayload() == null);
- assertTrue(lio.getProcessorContext().getObjectRegistry() == null);
+ assertTrue(procContext.getUserPayload() == null);
+ assertTrue(procContext.getObjectRegistry() == null);
- try {
- lio.getProcessorContext().waitForAnyInputReady(Collections.<Input>emptyList());
- fail("Processor context should have been already cleanup");
- } catch (Throwable t) {
- assertTrue(t instanceof NullPointerException);
+ for (InputContext inputContext : inputContexts) {
+ assertTrue(inputContext.getUserPayload() == null);
+ assertTrue(inputContext.getObjectRegistry() == null);
}
- try {
- lio.getProcessorContext().requestInitialMemory(0, null);
- fail("Processor context should have been already cleanup");
- } catch (Throwable t) {
- assertTrue(t instanceof NullPointerException);
+ for (OutputContext outputContext : outputContexts) {
+ assertTrue(outputContext.getUserPayload() == null);
+ assertTrue(outputContext.getObjectRegistry() == null);
}
- assertTrue(lio.getInputContexts().size() == 0);
- assertTrue(lio.getOutputContexts().size() == 0);
+ assertEquals(0, lio.inputSpecs.size());
+ assertEquals(0, lio.inputsMap.size());
+ assertEquals(0, lio.inputContextMap.size());
+ assertEquals(0, lio.outputSpecs.size());
+ assertEquals(0, lio.outputsMap.size());
+ assertEquals(0, lio.outputContextMap.size());
+ assertTrue(lio.groupInputSpecs == null || lio.groupInputSpecs.size() == 0);
+ assertNull(lio.groupInputsMap);
+ assertNull(lio.processor);
+ assertNull(lio.processorContext);
+ assertEquals(0, lio.runInputMap.size());
+ assertEquals(0, lio.runOutputMap.size());
+ assertEquals(0, lio.eventsToBeProcessed.size());
+ assertNull(lio.eventRouterThread);
}
private TaskSpec createTaskSpec(TezTaskAttemptID taskAttemptID,
@@ -248,7 +265,6 @@ public class TestLogicalIOProcessorRuntimeTask {
public void start() throws Exception {
startCount++;
this.vertexParallelism = getContext().getVertexParallelism();
- System.err.println("In started");
}
@Override