You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2021/01/27 23:51:28 UTC

[tez] branch master updated: TEZ-3985: Correctness: Throw a clear exception for DMEs sent during cleanup (Contributed by Jaume M, reviewed by Rajesh Balamohan, Ashutosh Chauhan)

This is an automated email from the ASF dual-hosted git repository.

rbalamohan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git


The following commit(s) were added to refs/heads/master by this push:
     new 04597c7  TEZ-3985: Correctness: Throw a clear exception for DMEs sent during cleanup (Contributed by Jaume M, reviewed by Rajesh Balamohan, Ashutosh Chauhan)
04597c7 is described below

commit 04597c7b591917bbe995a9e182c3be60e33816c9
Author: Rajesh Balamohan <rb...@apache.org>
AuthorDate: Thu Jan 28 05:20:25 2021 +0530

    TEZ-3985: Correctness: Throw a clear exception for DMEs sent during cleanup (Contributed by Jaume M, reviewed by Rajesh Balamohan, Ashutosh Chauhan)
---
 .../org/apache/tez/runtime/api/OutputContext.java  | 10 +++
 .../tez/runtime/api/impl/TezOutputContextImpl.java | 30 ++++++-
 .../tez/runtime/internals/api/TezTrapEvent.java    | 52 ++++++++++++
 .../runtime/internals/api/TezTrapEventType.java    | 29 +++++++
 .../tez/runtime/task/TaskRunner2Callable.java      | 12 ++-
 .../apache/tez/runtime/task/TezTaskRunner2.java    |  3 +-
 .../tez/runtime/task/TezTrapEventHandler.java      | 92 ++++++++++++++++++++
 .../runtime/TestLogicalIOProcessorRuntimeTask.java | 97 +++++++++++++++++++++-
 8 files changed, 318 insertions(+), 7 deletions(-)

diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputContext.java
index 882eb4b..33fe772 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputContext.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputContext.java
@@ -19,6 +19,7 @@
 package org.apache.tez.runtime.api;
 
 import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.yarn.event.EventHandler;
 
 /**
  * Context handle for the Output to initialize itself.
@@ -48,4 +49,13 @@ public interface OutputContext extends TaskContext {
    */
   public OutputStatisticsReporter getStatisticsReporter();
 
+  /**
+   * Notify the context that at this point no more events should be sent.
+   * This is used as a safety measure to prevent events being sent after close
+   * or in cleanup. After this is called events being queued to be sent to the
+   * AM will instead be passed to the event handler.
+   * @param eventHandler should handle the events after the call.
+   */
+  void trapEvents(EventHandler eventHandler);
+
 }
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
index ec8280a..20ec062 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
@@ -29,7 +29,9 @@ import javax.annotation.Nullable;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.common.TezExecutors;
+import org.apache.tez.runtime.internals.api.TezTrapEvent;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
@@ -47,17 +49,28 @@ import org.apache.tez.runtime.common.resources.MemoryDistributor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+@SuppressWarnings("unchecked")
 public class TezOutputContextImpl extends TezTaskContextImpl
     implements OutputContext {
 
   private static final Logger LOG = LoggerFactory.getLogger(TezOutputContextImpl.class);
 
   private volatile UserPayload userPayload;
+
+  /**
+   * Holds whether we can accept more events to send to the AM.
+   */
+  private volatile boolean trapEvents;
   private final String destinationVertexName;
   private final EventMetaData sourceInfo;
   private final int outputIndex;
   private final OutputStatisticsReporterImpl statsReporter;
 
+  /**
+   * Handler for the events after the trap flag is set.
+   */
+  private EventHandler<TezTrapEvent> trapEventHandler;
+
   class OutputStatisticsReporterImpl implements OutputStatisticsReporter {
 
     @Override
@@ -71,7 +84,7 @@ public class TezOutputContextImpl extends TezTaskContextImpl
     public void reportItemsProcessed(long items) {
       // this is a concurrent map. Plus we are not adding/deleting entries
       runtimeTask.getTaskStatistics().getIOStatistics().get(destinationVertexName)
-      .setItemsProcessed(items);;
+      .setItemsProcessed(items);
     }
     
   }
@@ -124,7 +137,11 @@ public class TezOutputContextImpl extends TezTaskContextImpl
       TezEvent tEvt = new TezEvent(e, sourceInfo);
       tezEvents.add(tEvt);
     }
-    tezUmbilical.addEvents(tezEvents);
+    if (trapEvents) {
+      trapEventHandler.handle(new TezTrapEvent(tezEvents));
+    } else {
+      tezUmbilical.addEvents(tezEvents);
+    }
   }
 
   @Override
@@ -163,6 +180,15 @@ public class TezOutputContextImpl extends TezTaskContextImpl
     return statsReporter;
   }
 
+  /**
+   * This will monitor some of the events that will be sent.
+   */
+  @Override
+  public final void trapEvents(final EventHandler eventHandler) {
+    trapEvents = true;
+    this.trapEventHandler = eventHandler;
+  }
+
   @Override
   public void close() throws IOException {
     super.close();
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/api/TezTrapEvent.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/api/TezTrapEvent.java
new file mode 100644
index 0000000..8c63b74
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/api/TezTrapEvent.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.internals.api;
+
+import org.apache.tez.common.TezAbstractEvent;
+import org.apache.tez.runtime.api.impl.TezEvent;
+
+import java.util.List;
+
+import static org.apache.tez.runtime.internals.api.TezTrapEventType.TRAP_EVENT_TYPE;
+
+/**
+ * Event sent when no more events should be sent to the AM.
+ */
+public class TezTrapEvent extends TezAbstractEvent<TezTrapEventType> {
+  /**
+   * Events that were reported.
+   */
+  private final List<TezEvent> tezEvents;
+
+  /**
+   * Create a tez trap event.
+   * @param events events tried to be sent to the AM.
+   */
+  public TezTrapEvent(final List<TezEvent> events) {
+    super(TRAP_EVENT_TYPE);
+    this.tezEvents = events;
+  }
+
+  /**
+   * @return events.
+   */
+  public final List<TezEvent> getTezEvents() {
+    return tezEvents;
+  }
+}
\ No newline at end of file
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/api/TezTrapEventType.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/api/TezTrapEventType.java
new file mode 100644
index 0000000..89cb78e
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/api/TezTrapEventType.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.internals.api;
+
+/**
+ * Enum type with only one value representing this event.
+ */
+public enum TezTrapEventType {
+  /**
+   * Single value for this event type.
+   */
+  TRAP_EVENT_TYPE
+}
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java
index 0e6dfda..810a806 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java
@@ -24,6 +24,7 @@ import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
+import org.apache.tez.runtime.api.impl.TezUmbilical;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,10 +47,16 @@ public class TaskRunner2Callable extends CallableWithNdc<TaskRunner2Callable.Tas
 
   private volatile Thread ownThread;
 
+  /**
+   * Protocol to send the events.
+   */
+  private final TezUmbilical tezUmbilical;
+
   public TaskRunner2Callable(LogicalIOProcessorRuntimeTask task,
-                             UserGroupInformation ugi) {
+      final UserGroupInformation ugi, final TezUmbilical umbilical) {
     this.task = task;
     this.ugi = ugi;
+    this.tezUmbilical = umbilical;
   }
 
   @Override
@@ -109,6 +116,9 @@ public class TaskRunner2Callable extends CallableWithNdc<TaskRunner2Callable.Tas
       // For a successful task, however, this should be almost no delay since close has already happened.
       maybeFixInterruptStatus();
       LOG.info("Cleaning up task {}, stopRequested={}", task.getTaskAttemptID(), stopRequested.get());
+      task.getOutputContexts().forEach(outputContext
+          -> outputContext.trapEvents(new TezTrapEventHandler(outputContext,
+          this.tezUmbilical)));
       task.cleanup();
     }
   }
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
index ae81769..bbf037b 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
@@ -177,7 +177,8 @@ public class TezTaskRunner2 {
           // Safe to do this within a synchronized block because we're providing
           // the handler on which the Reporter will communicate back. Assuming
           // the register call doesn't end up hanging.
-          taskRunnerCallable = new TaskRunner2Callable(task, ugi);
+          taskRunnerCallable = new TaskRunner2Callable(task, ugi,
+              umbilicalAndErrorHandler);
           taskReporter.registerTask(task, umbilicalAndErrorHandler);
           future = executor.submit(taskRunnerCallable);
         }
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTrapEventHandler.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTrapEventHandler.java
new file mode 100644
index 0000000..b35dbb0
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTrapEventHandler.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.tez.runtime.task;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.runtime.api.impl.TezOutputContextImpl;
+import org.apache.tez.runtime.api.impl.TezUmbilical;
+import org.apache.tez.runtime.internals.api.TezTrapEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Class that handles the events after the trap has been activated. At
+ * this point no more events of some types shouldn't be sent and it's
+ * a bug to do so. If the events arrive here probably the task will be
+ * restarted because it has failed.
+ */
+public class TezTrapEventHandler implements EventHandler<TezTrapEvent> {
+  /**
+   * logger.
+   */
+  private static final Logger
+      LOG = LoggerFactory.getLogger(TezOutputContextImpl.class);
+
+  /**
+   * Output context that will report the events.
+   */
+  private final OutputContext outputContext;
+
+  /**
+   * Protocol to send the events.
+   */
+  private final TezUmbilical tezUmbilical;
+
+  /**
+   * @param output context that will report the events.
+   * @param umbilical used to send the events to the AM.
+   */
+  TezTrapEventHandler(final OutputContext output,
+      final TezUmbilical umbilical) {
+    this.outputContext = output;
+    this.tezUmbilical = umbilical;
+  }
+
+  /**
+   * Decide what to do with the events.
+   * @param tezTrapEvent event holding the tez events.
+   */
+  @Override
+  public final void handle(final TezTrapEvent tezTrapEvent) {
+    Preconditions.checkArgument(tezTrapEvent.getTezEvents() != null);
+    List<TezEvent> tezEvents = new ArrayList<TezEvent>(
+        tezTrapEvent.getTezEvents().size());
+    for (TezEvent tezEvent: tezTrapEvent.getTezEvents()) {
+      switch (tezEvent.getEventType()) {
+      case COMPOSITE_DATA_MOVEMENT_EVENT:
+      case DATA_MOVEMENT_EVENT:
+        String errorMsg = "Some events won't be sent to the AM because all"
+            + " the events should have been sent at this point. Most likely"
+            + " this would result in a bug. "
+            + " event:" + tezEvent.toString();
+        Throwable throwable = new Throwable(errorMsg);
+        LOG.error(errorMsg, throwable);
+        break;
+      default:
+        LOG.info("Event of type " + tezEvent.getEventType() + " will be sent"
+            + " to the AM after the task was closed ");
+        tezEvents.add(tezEvent);
+      }
+    }
+    tezUmbilical.addEvents(tezEvents);
+  }
+}
\ No newline at end of file
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
index 599f98f..ba9a66d 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
@@ -24,13 +24,18 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.tez.common.TezExecutors;
 import org.apache.tez.common.TezSharedExecutor;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
@@ -41,12 +46,15 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.hadoop.shim.DefaultHadoopShim;
+import org.apache.tez.hadoop.shim.HadoopShim;
 import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
 import org.apache.tez.runtime.api.AbstractLogicalInput;
 import org.apache.tez.runtime.api.AbstractLogicalOutput;
 import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.ExecutionContext;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.ObjectRegistry;
 import org.apache.tez.runtime.api.Reader;
 import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.api.OutputContext;
@@ -57,13 +65,16 @@ import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
 import org.apache.tez.runtime.api.impl.InputSpec;
 import org.apache.tez.runtime.api.impl.OutputSpec;
 import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.runtime.api.impl.TezEvent;
 import org.apache.tez.runtime.api.impl.TezUmbilical;
 import org.apache.tez.runtime.common.resources.ScalingAllocator;
+import org.apache.tez.runtime.task.TaskRunner2Callable;
 import org.junit.Test;
 
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
+
 import org.mockito.Mockito;
 
 public class TestLogicalIOProcessorRuntimeTask {
@@ -149,6 +160,39 @@ public class TestLogicalIOProcessorRuntimeTask {
 
   }
 
+  @Test
+  public void testEventsCantBeSentInCleanup() throws Exception {
+    TezDAGID dagId = createTezDagId();
+    TezVertexID vertexId = createTezVertexId(dagId);
+    Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<>();
+    Multimap<String, String> startedInputsMap = HashMultimap.create();
+    TezUmbilical umbilical = mock(TezUmbilical.class);
+    TezConfiguration tezConf = new TezConfiguration();
+    tezConf.set(TezConfiguration.TEZ_TASK_SCALE_MEMORY_ALLOCATOR_CLASS,
+        ScalingAllocator.class.getName());
+
+    TezTaskAttemptID taId1 = createTaskAttemptID(vertexId, 1);
+    TaskSpec task1 = createTaskSpec(taId1, "dag1", "vertex1", 30,
+        RunExceptionProcessor.class.getName(),
+        TestOutputWithEvents.class.getName());
+
+    TezSharedExecutor sharedExecutor = new TezSharedExecutor(tezConf);
+    LogicalIOProcessorRuntimeTask lio =
+        new CleanupLogicalIOProcessorRuntimeTask(task1, 0, tezConf, null,
+            umbilical, serviceConsumerMetadata, new HashMap<String, String>(),
+            startedInputsMap, null, "", new ExecutionContextImpl("localhost"),
+            Runtime.getRuntime().maxMemory(), true, new DefaultHadoopShim(),
+            sharedExecutor);
+
+    TaskRunner2Callable runner =
+        new TaskRunner2Callable(lio, UserGroupInformation.getCurrentUser(), umbilical);
+
+    runner.call();
+
+    // We verify that no events were sent
+    Mockito.verify(umbilical, Mockito.only()).addEvents(Collections.<TezEvent> emptyList());
+  }
+
   /**
    * We should expect no events being sent to the AM if an
    * exception happens in the close method of the processor
@@ -167,7 +211,7 @@ public class TestLogicalIOProcessorRuntimeTask {
 
     TezTaskAttemptID taId1 = createTaskAttemptID(vertexId, 1);
     TaskSpec task1 = createTaskSpec(taId1, "dag1", "vertex1", 30,
-        FaultyTestProcessor.class.getName(),
+        CloseExceptionProcessor.class.getName(),
         TestOutputWithEvents.class.getName());
 
     TezSharedExecutor sharedExecutor = new TezSharedExecutor(tezConf);
@@ -281,6 +325,31 @@ public class TestLogicalIOProcessorRuntimeTask {
     return TezDAGID.getInstance("2000", 100, 1);
   }
 
+  private static class CleanupLogicalIOProcessorRuntimeTask
+      extends LogicalIOProcessorRuntimeTask {
+    CleanupLogicalIOProcessorRuntimeTask(TaskSpec taskSpec,
+        int appAttemptNumber, Configuration tezConf, String[] localDirs,
+        TezUmbilical tezUmbilical,
+        Map<String, ByteBuffer> serviceConsumerMetadata,
+        Map<String, String> envMap, Multimap<String, String> startedInputsMap,
+        ObjectRegistry objectRegistry, String pid,
+        org.apache.tez.runtime.api.ExecutionContext ExecutionContext,
+        long memAvailable, boolean updateSysCounters, HadoopShim hadoopShim,
+        TezExecutors sharedExecutor) throws IOException {
+      super(taskSpec, appAttemptNumber, tezConf, localDirs, tezUmbilical,
+          serviceConsumerMetadata, envMap, startedInputsMap, objectRegistry,
+          pid, ExecutionContext, memAvailable, updateSysCounters, hadoopShim,
+          sharedExecutor);
+    }
+
+    @Override public void cleanup() throws InterruptedException {
+      getOutputContexts().forEach(context
+          -> context.sendEvents(Arrays.asList(
+              CompositeDataMovementEvent.create(0, 0, null)
+      )));
+    }
+  }
+
   public static class TestProcessor extends AbstractLogicalIOProcessor {
 
     public static volatile int runCount = 0;
@@ -310,8 +379,30 @@ public class TestLogicalIOProcessorRuntimeTask {
 
   }
 
-  public static class FaultyTestProcessor extends TestProcessor {
-    public FaultyTestProcessor(ProcessorContext context) {
+  public static class RunExceptionProcessor
+      extends TestProcessor {
+
+    public RunExceptionProcessor(ProcessorContext context) {
+      super(context);
+    }
+
+    public void run(Map<String, LogicalInput> inputs,
+        Map<String, LogicalOutput> outputs)
+        throws Exception {
+      // This exception is thrown in purpose because we want to test this
+      throw new RuntimeException();
+    }
+
+    @Override
+    public void close() throws Exception {
+      // This exception is thrown because this method shouldn't be called
+      // if run has thrown an exception.
+      throw new RuntimeException();
+    }
+  }
+
+  public static class CloseExceptionProcessor extends TestProcessor {
+    public CloseExceptionProcessor(ProcessorContext context) {
       super(context);
     }