You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/08 20:08:32 UTC

tez git commit: TEZ-2426. Ensure the eventRouter thread completes before switching to a new task and thread safety fixes in IPOContexts. (sseth)

Repository: tez
Updated Branches:
  refs/heads/master 6e6ad706f -> ce69aa1e2


TEZ-2426. Ensure the eventRouter thread completes before switching to a
new task and thread safety fixes in IPOContexts. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ce69aa1e
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ce69aa1e
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ce69aa1e

Branch: refs/heads/master
Commit: ce69aa1e2ca3320d33c833a96a158f94bfd73f52
Parents: 6e6ad70
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri May 8 11:08:14 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri May 8 11:08:14 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../runtime/LogicalIOProcessorRuntimeTask.java  | 110 +++++++++++++------
 .../runtime/api/impl/TezInputContextImpl.java   |  11 +-
 .../runtime/api/impl/TezOutputContextImpl.java  |   2 +-
 .../api/impl/TezProcessorContextImpl.java       |   4 +-
 .../runtime/api/impl/TezTaskContextImpl.java    |   9 +-
 .../apache/tez/runtime/task/TaskReporter.java   |  47 +++++---
 .../TestLogicalIOProcessorRuntimeTask.java      |  48 +++++---
 8 files changed, 160 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/ce69aa1e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 185e1b0..efb19b2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -20,6 +20,7 @@ INCOMPATIBLE CHANGES
     Default max limit increased. Should not affect existing users.
 
 ALL CHANGES:
+  TEZ-2426. Ensure the eventRouter thread completes before switching to a new task and thread safety fixes in IPOContexts.
   TEZ-2412. Should kill vertex in DAGImpl#VertexRerunWhileCommitting
   TEZ-2410. VertexGroupCommitFinishedEvent & VertexCommitStartedEvent is not logged correctly
   TEZ-776. Reduce AM mem usage caused by storing TezEvents

http://git-wip-us.apache.org/repos/asf/tez/blob/ce69aa1e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index f465d3c..1cfe538 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -30,6 +30,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
@@ -96,45 +97,46 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   private static final Logger LOG = LoggerFactory
       .getLogger(LogicalIOProcessorRuntimeTask.class);
 
+  @VisibleForTesting // All fields non private for testing.
   private final String[] localDirs;
   /** Responsible for maintaining order of Inputs */
-  private final List<InputSpec> inputSpecs;
-  private final Map<String, LogicalInput> inputsMap;
-  private final Map<String, InputContext> inputContextMap;
+  final List<InputSpec> inputSpecs;
+  final ConcurrentMap<String, LogicalInput> inputsMap;
+  final ConcurrentMap<String, InputContext> inputContextMap;
   /** Responsible for maintaining order of Outputs */
-  private final List<OutputSpec> outputSpecs;
-  private final Map<String, LogicalOutput> outputsMap;
-  private final Map<String, OutputContext> outputContextMap;
+  final List<OutputSpec> outputSpecs;
+  final ConcurrentMap<String, LogicalOutput> outputsMap;
+  final ConcurrentMap<String, OutputContext> outputContextMap;
 
-  private final List<GroupInputSpec> groupInputSpecs;
-  private ConcurrentHashMap<String, MergedLogicalInput> groupInputsMap;
+  final List<GroupInputSpec> groupInputSpecs;
+  ConcurrentHashMap<String, MergedLogicalInput> groupInputsMap;
 
-  private final ProcessorDescriptor processorDescriptor;
-  private AbstractLogicalIOProcessor processor;
-  private ProcessorContext processorContext;
+  final ProcessorDescriptor processorDescriptor;
+  AbstractLogicalIOProcessor processor;
+  ProcessorContext processorContext;
 
   private final MemoryDistributor initialMemoryDistributor;
 
   /** Maps which will be provided to the processor run method */
-  private final LinkedHashMap<String, LogicalInput> runInputMap;
-  private final LinkedHashMap<String, LogicalOutput> runOutputMap;
+  final LinkedHashMap<String, LogicalInput> runInputMap;
+  final LinkedHashMap<String, LogicalOutput> runOutputMap;
   
   private final Map<String, ByteBuffer> serviceConsumerMetadata;
   private final Map<String, String> envMap;
 
-  private final ExecutorService initializerExecutor;
+  final ExecutorService initializerExecutor;
   private final CompletionService<Void> initializerCompletionService;
 
   private final Multimap<String, String> startedInputsMap;
 
-  private LinkedBlockingQueue<TezEvent> eventsToBeProcessed;
-  private Thread eventRouterThread = null;
+  LinkedBlockingQueue<TezEvent> eventsToBeProcessed;
+  Thread eventRouterThread = null;
 
   private final int appAttemptNumber;
 
-  private final InputReadyTracker inputReadyTracker;
+  private volatile InputReadyTracker inputReadyTracker;
   
-  private final ObjectRegistry objectRegistry;
+  private volatile ObjectRegistry objectRegistry;
   private final ExecutionContext ExecutionContext;
   private final long memAvailable;
 
@@ -143,6 +145,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
       Map<String, ByteBuffer> serviceConsumerMetadata, Map<String, String> envMap,
       Multimap<String, String> startedInputsMap, ObjectRegistry objectRegistry,
       String pid, ExecutionContext ExecutionContext, long memAvailable) throws IOException {
+    // Note: If adding any fields here, make sure they're cleaned up in the cleanupContext method.
     // TODO Remove jobToken from here post TEZ-421
     super(taskSpec, tezConf, tezUmbilical, pid);
     LOG.info("Initializing LogicalIOProcessorRuntimeTask with TaskSpec: "
@@ -361,6 +364,14 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
       setTaskDone();
       if (eventRouterThread != null) {
         eventRouterThread.interrupt();
+        LOG.info("Joining on EventRouter");
+        try {
+          eventRouterThread.join();
+        } catch (InterruptedException e) {
+          LOG.info("Ignoring interrupt while waiting for the router thread to die");
+          Thread.currentThread().interrupt();
+        }
+        eventRouterThread = null;
       }
     }
   }
@@ -694,14 +705,6 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     eventRouterThread.start();
   }
 
-  private void cleanupInputOutputs() {
-    if (groupInputsMap != null) {
-      groupInputsMap.clear();
-    }
-    inputsMap.clear();
-    outputsMap.clear();
-  }
-
   private void closeContexts() throws IOException {
     closeContext(inputContextMap);
     closeContext(outputContextMap);
@@ -725,19 +728,62 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     }
   }
 
-  public synchronized void cleanup() {
+  public void cleanup() throws InterruptedException {
+    LOG.info("Final Counters : " + getCounters().toShortString());
+    setTaskDone();
+    if (eventRouterThread != null) {
+      eventRouterThread.interrupt();
+      LOG.info("Joining on EventRouter");
+      try {
+        eventRouterThread.join();
+      } catch (InterruptedException e) {
+        LOG.info("Ignoring interrupt while waiting for the router thread to die");
+        Thread.currentThread().interrupt();
+      }
+      eventRouterThread = null;
+    }
     try {
-      cleanupInputOutputs();
       closeContexts();
+      // Cleanup references which may be held by misbehaved tasks.
+      cleanupStructures();
     } catch (IOException e) {
       LOG.info("Error while cleaning up contexts ", e);
     }
+  }
 
-    LOG.info("Final Counters : " + getCounters().toShortString());
-    setTaskDone();
-    if (eventRouterThread != null) {
-      eventRouterThread.interrupt();
+  private void cleanupStructures() {
+    if (initializerExecutor != null && !initializerExecutor.isShutdown()) {
+      initializerExecutor.shutdownNow();
+    }
+    inputsMap.clear();
+    outputsMap.clear();
+
+    inputSpecs.clear();
+    outputSpecs.clear();
+
+    inputsMap.clear();
+    outputsMap.clear();
+
+    inputContextMap.clear();
+    outputContextMap.clear();
+
+    if (groupInputSpecs != null) {
+      groupInputSpecs.clear();
     }
+    if (groupInputsMap != null) {
+      groupInputsMap.clear();
+      groupInputsMap = null;
+    }
+
+    processor = null;
+    processorContext = null;
+
+    runInputMap.clear();
+    runOutputMap.clear();
+
+    eventsToBeProcessed.clear();
+    inputReadyTracker = null;
+    objectRegistry = null;
   }
   
   @Private

http://git-wip-us.apache.org/repos/asf/tez/blob/ce69aa1e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
index 101aeb9..8d6466a 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
@@ -55,12 +55,12 @@ public class TezInputContextImpl extends TezTaskContextImpl
 
   private static final Logger LOG = LoggerFactory.getLogger(TezInputContextImpl.class);
 
-  private UserPayload userPayload;
+  private volatile UserPayload userPayload;
   private final String sourceVertexName;
   private final EventMetaData sourceInfo;
   private final int inputIndex;
   private final Map<String, LogicalInput> inputs;
-  private InputReadyTracker inputReadyTracker;
+  private volatile InputReadyTracker inputReadyTracker;
   private final InputStatisticsReporterImpl statsReporter;
   
   class InputStatisticsReporterImpl implements InputStatisticsReporter {
@@ -159,7 +159,11 @@ public class TezInputContextImpl extends TezTaskContextImpl
 
   @Override
   public void inputIsReady() {
-    inputReadyTracker.setInputIsReady(inputs.get(sourceVertexName));
+    if (inputReadyTracker != null) {
+      inputReadyTracker.setInputIsReady(inputs.get(sourceVertexName));
+    } else {
+      LOG.warn("Ignoring Input Ready notification since the Task has already been closed");
+    }
   }
 
   @Override
@@ -172,7 +176,6 @@ public class TezInputContextImpl extends TezTaskContextImpl
     super.close();
     this.userPayload = null;
     this.inputReadyTracker = null;
-    inputs.clear();
     LOG.info("Cleared TezInputContextImpl related information");
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/ce69aa1e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
index b46cfd2..71e96db 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
@@ -53,7 +53,7 @@ public class TezOutputContextImpl extends TezTaskContextImpl
 
   private static final Logger LOG = LoggerFactory.getLogger(TezOutputContextImpl.class);
 
-  private UserPayload userPayload;
+  private volatile UserPayload userPayload;
   private final String destinationVertexName;
   private final EventMetaData sourceInfo;
   private final int outputIndex;

http://git-wip-us.apache.org/repos/asf/tez/blob/ce69aa1e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
index d6b3ec5..a191ae8 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
@@ -53,8 +53,8 @@ public class TezProcessorContextImpl extends TezTaskContextImpl implements Proce
 
   private static final Logger LOG = LoggerFactory.getLogger(TezProcessorContextImpl.class);
 
-  private UserPayload userPayload;
-  private InputReadyTracker inputReadyTracker;
+  private volatile UserPayload userPayload;
+  private volatile InputReadyTracker inputReadyTracker;
   private final EventMetaData sourceInfo;
 
   public TezProcessorContextImpl(Configuration conf, String[] workDirs, int appAttemptNumber,

http://git-wip-us.apache.org/repos/asf/tez/blob/ce69aa1e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
index 170741a..5f04c80 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
@@ -54,15 +54,15 @@ public abstract class TezTaskContextImpl implements TaskContext, Closeable {
   private final TezCounters counters;
   private String[] workDirs;
   private String uniqueIdentifier;
-  protected LogicalIOProcessorRuntimeTask runtimeTask;
+  protected final LogicalIOProcessorRuntimeTask runtimeTask;
   protected final TezUmbilical tezUmbilical;
   private final Map<String, ByteBuffer> serviceConsumerMetadata;
   private final int appAttemptNumber;
   private final Map<String, String> auxServiceEnv;
-  protected MemoryDistributor initialMemoryDistributor;
+  protected volatile MemoryDistributor initialMemoryDistributor;
   protected final EntityDescriptor<?> descriptor;
   private final String dagName;
-  private ObjectRegistry objectRegistry;
+  private volatile ObjectRegistry objectRegistry;
   private final int vertexParallelism;
   private final ExecutionContext ExecutionContext;
   private final long memAvailable;
@@ -225,7 +225,8 @@ public abstract class TezTaskContextImpl implements TaskContext, Closeable {
 
   @Override
   public void close() throws IOException {
-    this.runtimeTask = null;
+    Preconditions.checkState(runtimeTask.isTaskDone(),
+        "Runtime task must be complete before calling cleanup");
     this.objectRegistry = null;
     this.initialMemoryDistributor = null;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/ce69aa1e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
index 3d1d1a2..8b9db16 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
@@ -112,6 +113,7 @@ public class TaskReporter {
   public synchronized void unregisterTask(TezTaskAttemptID taskAttemptID) {
     currentCallable.markComplete();
     currentCallable = null;
+    // KKK Make sure the callable completes before proceeding
   }
   
   public void shutdown() {
@@ -125,7 +127,7 @@ public class TaskReporter {
     private static final float LOG_COUNTER_BACKOFF = 1.3f;
 
     private final RuntimeTask task;
-    private EventMetaData updateEventMetadata;
+    private final EventMetaData updateEventMetadata;
 
     private final TezTaskUmbilicalProtocol umbilical;
 
@@ -136,6 +138,9 @@ public class TaskReporter {
 
     private final AtomicLong requestCounter;
 
+    private final AtomicBoolean finalEventQueued = new AtomicBoolean(false);
+    private final AtomicBoolean askedToDie = new AtomicBoolean(false);
+
     private LinkedBlockingQueue<TezEvent> eventsToSend = new LinkedBlockingQueue<TezEvent>();
 
     private final ReentrantLock lock = new ReentrantLock();
@@ -199,6 +204,9 @@ public class TaskReporter {
       }
       int pendingEventCount = eventsToSend.size();
       if (pendingEventCount > 0) {
+        // This is OK because the pending events will be sent via the succeeded/failed messages.
+        // TaskDone is set before taskSucceeded / taskFailed are sent out - which is what causes the
+        // thread to exit.
         LOG.warn("Exiting TaskReporter thread with pending queue size=" + pendingEventCount);
       }
       return true;
@@ -256,6 +264,7 @@ public class TaskReporter {
 
       if (response.shouldDie()) {
         LOG.info("Received should die response from AM");
+        askedToDie.set(true);
         return new ResponseWrapper(true, 1);
       }
       if (response.getLastRequestId() != requestId) {
@@ -268,7 +277,7 @@ public class TaskReporter {
       int numEventsReceived = 0;
       if (task.isTaskDone() || task.hadFatalError()) {
         if (response.getEvents() != null && !response.getEvents().isEmpty()) {
-          LOG.warn("Current task already complete, Ignoring all event in"
+          LOG.info("Current task already complete, Ignoring all event in"
               + " heartbeat response, eventCount=" + response.getEvents().size());
         }
       } else {
@@ -315,10 +324,16 @@ public class TaskReporter {
      *           indicates an exception somewhere in the AM.
      */
     private boolean taskSucceeded(TezTaskAttemptID taskAttemptID) throws IOException, TezException {
-      TezEvent statusUpdateEvent = new TezEvent(getStatusUpdateEvent(true), updateEventMetadata);
-      TezEvent taskCompletedEvent = new TezEvent(new TaskAttemptCompletedEvent(),
-          updateEventMetadata);
-      return !heartbeat(Lists.newArrayList(statusUpdateEvent, taskCompletedEvent)).shouldDie;
+      // Ensure only one final event is ever sent.
+      if (!finalEventQueued.getAndSet(true)) {
+        TezEvent statusUpdateEvent = new TezEvent(getStatusUpdateEvent(true), updateEventMetadata);
+        TezEvent taskCompletedEvent = new TezEvent(new TaskAttemptCompletedEvent(),
+            updateEventMetadata);
+        return !heartbeat(Lists.newArrayList(statusUpdateEvent, taskCompletedEvent)).shouldDie;
+      } else {
+        LOG.warn("A final task state event has already been sent. Not sending again");
+        return askedToDie.get();
+      }
     }
     
     @VisibleForTesting
@@ -351,15 +366,21 @@ public class TaskReporter {
      */
     private boolean taskFailed(TezTaskAttemptID taskAttemptID, Throwable t, String diagnostics,
         EventMetaData srcMeta) throws IOException, TezException {
-      TezEvent statusUpdateEvent = new TezEvent(getStatusUpdateEvent(true), updateEventMetadata);
-      if (diagnostics == null) {
-        diagnostics = ExceptionUtils.getStackTrace(t);
+      // Ensure only one final event is ever sent.
+      if (!finalEventQueued.getAndSet(true)) {
+        TezEvent statusUpdateEvent = new TezEvent(getStatusUpdateEvent(true), updateEventMetadata);
+        if (diagnostics == null) {
+          diagnostics = ExceptionUtils.getStackTrace(t);
+        } else {
+          diagnostics = diagnostics + ":" + ExceptionUtils.getStackTrace(t);
+        }
+        TezEvent taskAttemptFailedEvent = new TezEvent(new TaskAttemptFailedEvent(diagnostics),
+            srcMeta == null ? updateEventMetadata : srcMeta);
+        return !heartbeat(Lists.newArrayList(statusUpdateEvent, taskAttemptFailedEvent)).shouldDie;
       } else {
-        diagnostics = diagnostics + ":" + ExceptionUtils.getStackTrace(t);
+        LOG.warn("A final task state event has already been sent. Not sending again");
+        return askedToDie.get();
       }
-      TezEvent taskAttemptFailedEvent = new TezEvent(new TaskAttemptFailedEvent(diagnostics),
-          srcMeta == null ? updateEventMetadata : srcMeta);
-      return !heartbeat(Lists.newArrayList(statusUpdateEvent, taskAttemptFailedEvent)).shouldDie;
     }
 
     private void addEvents(TezTaskAttemptID taskAttemptID, Collection<TezEvent> events) {

http://git-wip-us.apache.org/repos/asf/tez/blob/ce69aa1e/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
index df932cf..b337bc7 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
@@ -19,6 +19,7 @@
 package org.apache.tez.runtime;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
@@ -26,6 +27,7 @@ import static org.mockito.Mockito.mock;
 import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
@@ -33,6 +35,7 @@ import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
@@ -133,29 +136,43 @@ public class TestLogicalIOProcessorRuntimeTask {
 
   }
 
-  private void cleanupAndTest(LogicalIOProcessorRuntimeTask lio) {
+  private void cleanupAndTest(LogicalIOProcessorRuntimeTask lio) throws InterruptedException {
+
+    ProcessorContext procContext = lio.getProcessorContext();
+    List<InputContext> inputContexts = new LinkedList<InputContext>();
+    inputContexts.addAll(lio.getInputContexts());
+    List<OutputContext> outputContexts = new LinkedList<OutputContext>();
+    outputContexts.addAll(lio.getOutputContexts());
 
     lio.cleanup();
 
-    assertTrue(lio.getProcessorContext().getUserPayload() == null);
-    assertTrue(lio.getProcessorContext().getObjectRegistry() == null);
+    assertTrue(procContext.getUserPayload() == null);
+    assertTrue(procContext.getObjectRegistry() == null);
 
-    try {
-      lio.getProcessorContext().waitForAnyInputReady(Collections.<Input>emptyList());
-      fail("Processor context should have been already cleanup");
-    } catch (Throwable t) {
-      assertTrue(t instanceof NullPointerException);
+    for (InputContext inputContext : inputContexts) {
+      assertTrue(inputContext.getUserPayload() == null);
+      assertTrue(inputContext.getObjectRegistry() == null);
     }
 
-    try {
-      lio.getProcessorContext().requestInitialMemory(0, null);
-      fail("Processor context should have been already cleanup");
-    } catch (Throwable t) {
-      assertTrue(t instanceof NullPointerException);
+    for (OutputContext outputContext : outputContexts) {
+      assertTrue(outputContext.getUserPayload() == null);
+      assertTrue(outputContext.getObjectRegistry() == null);
     }
 
-    assertTrue(lio.getInputContexts().size() == 0);
-    assertTrue(lio.getOutputContexts().size() == 0);
+    assertEquals(0, lio.inputSpecs.size());
+    assertEquals(0, lio.inputsMap.size());
+    assertEquals(0, lio.inputContextMap.size());
+    assertEquals(0, lio.outputSpecs.size());
+    assertEquals(0, lio.outputsMap.size());
+    assertEquals(0, lio.outputContextMap.size());
+    assertTrue(lio.groupInputSpecs == null || lio.groupInputSpecs.size() == 0);
+    assertNull(lio.groupInputsMap);
+    assertNull(lio.processor);
+    assertNull(lio.processorContext);
+    assertEquals(0, lio.runInputMap.size());
+    assertEquals(0, lio.runOutputMap.size());
+    assertEquals(0, lio.eventsToBeProcessed.size());
+    assertNull(lio.eventRouterThread);
   }
 
   private TaskSpec createTaskSpec(TezTaskAttemptID taskAttemptID,
@@ -248,7 +265,6 @@ public class TestLogicalIOProcessorRuntimeTask {
     public void start() throws Exception {
       startCount++;
       this.vertexParallelism = getContext().getVertexParallelism();
-      System.err.println("In started");
     }
 
     @Override