You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2014/09/11 20:16:59 UTC

git commit: Revert due to build failure. "TEZ-1345. Add checks to guarantee all init events are written to recovery to consider vertex initialized. (Jeff Zhang via hitesh)"

Repository: tez
Updated Branches:
  refs/heads/master 2bdac2749 -> d6589d3ac


Revert due to build failure. "TEZ-1345. Add checks to guarantee all init events are written to recovery to consider vertex initialized. (Jeff Zhang via hitesh)"

This reverts commit 2bdac27495f6899b766e537576e7edf6df53c7a3.


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d6589d3a
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d6589d3a
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d6589d3a

Branch: refs/heads/master
Commit: d6589d3accf01ce77adea74b8595f70431611563
Parents: 2bdac27
Author: Hitesh Shah <hi...@apache.org>
Authored: Thu Sep 11 11:13:20 2014 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Thu Sep 11 11:13:20 2014 -0700

----------------------------------------------------------------------
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  7 +-
 .../tez/dag/app/dag/impl/VertexManager.java     | 28 ++++----
 .../tez/dag/app/dag/impl/TestVertexManager.java | 76 --------------------
 .../org/apache/tez/test/TestDAGRecovery.java    | 67 +----------------
 .../apache/tez/test/dag/MultiAttemptDAG.java    | 35 +--------
 5 files changed, 20 insertions(+), 193 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/d6589d3a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 670f445..ff556ba 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -156,6 +156,7 @@ import org.apache.tez.runtime.api.impl.GroupInputSpec;
 import org.apache.tez.runtime.api.impl.InputSpec;
 import org.apache.tez.runtime.api.impl.OutputSpec;
 import org.apache.tez.runtime.api.impl.TezEvent;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.HashMultiset;
@@ -2761,12 +2762,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       VertexEventRootInputInitialized liInitEvent = (VertexEventRootInputInitialized) event;
       VertexState state = vertex.getState();
       if (state == VertexState.INITIALIZING) {
-        List<TezEvent> inputInfoEvents =
-            vertex.vertexManager.onRootVertexInitialized(
+        vertex.vertexManager.onRootVertexInitialized(
             liInitEvent.getInputName(),
             vertex.getAdditionalInputs().get(liInitEvent.getInputName())
                 .getIODescriptor(), liInitEvent.getEvents());
-        ROUTE_EVENT_TRANSITION.transition(vertex, new VertexEventRouteEvent(vertex.vertexId, inputInfoEvents));
       }
 
       vertex.numInitializedInputs++;
@@ -2907,7 +2906,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         + " initWaitsForRootInitializers: " + initWaitsForRootInitializers);
     return false;
   }
-
+  
   public static class StartWhileInitializingTransition implements 
     SingleArcTransition<VertexImpl, VertexEvent> {
 

http://git-wip-us.apache.org/repos/asf/tez/blob/d6589d3a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index c2ff660..177b946 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -24,7 +24,6 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 
 import javax.annotation.Nullable;
 
@@ -45,6 +44,7 @@ import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.TaskAttempt;
 import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.runtime.api.Event;
@@ -68,11 +68,10 @@ public class VertexManager {
   VertexManagerPluginContextImpl pluginContext;
   UserPayload payload = null;
   AppContext appContext;
-  ConcurrentHashMap<String, List<TezEvent>> cachedRootInputEventMap;
-
+    
   class VertexManagerPluginContextImpl implements VertexManagerPluginContext {
     // TODO Add functionality to allow VertexManagers to send VertexManagerEvents
-
+    
     private EventMetaData rootEventSourceMetadata = new EventMetaData(EventProducerConsumerType.INPUT,
         managedVertex.getName(), "NULL_VERTEX", null);
     private Map<String, EventMetaData> destinationEventMetadataMap = Maps.newHashMap();
@@ -81,7 +80,7 @@ public class VertexManager {
     public Map<String, EdgeProperty> getInputVertexEdgeProperties() {
       // TODO Something similar for Initial Inputs - payload etc visible
       Map<Vertex, Edge> inputs = managedVertex.getInputVertices();
-      Map<String, EdgeProperty> vertexEdgeMap =
+      Map<String, EdgeProperty> vertexEdgeMap = 
                           Maps.newHashMapWithExpectedSize(inputs.size());
       for (Map.Entry<Vertex, Edge> entry : inputs.entrySet()) {
         vertexEdgeMap.put(entry.getKey().getName(), entry.getValue().getEdgeProperty());
@@ -116,7 +115,7 @@ public class VertexManager {
     @Override
     public Set<String> getVertexInputNames() {
       Set<String> inputNames = null;
-      Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>
+      Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> 
           inputs = managedVertex.getAdditionalInputs();
       if (inputs != null) {
         inputNames = inputs.keySet();
@@ -129,6 +128,7 @@ public class VertexManager {
       return payload;
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public void addRootInputEvents(final String inputName,
         Collection<InputDataInformationEvent> events) {
@@ -142,8 +142,8 @@ public class VertexManager {
               return tezEvent;
             }
           });
-
-      cachedRootInputEventMap.put(inputName,Lists.newArrayList(tezEvents));
+      appContext.getEventHandler().handle(
+          new VertexEventRouteEvent(managedVertex.getVertexId(), Lists.newArrayList(tezEvents)));
       // Recovery handling is taken care of by the Vertex.
     }
 
@@ -201,7 +201,7 @@ public class VertexManager {
     }
   }
 
-  public VertexManager(VertexManagerPluginDescriptor pluginDesc,
+  public VertexManager(VertexManagerPluginDescriptor pluginDesc, 
       Vertex managedVertex, AppContext appContext) {
     checkNotNull(pluginDesc, "pluginDesc is null");
     checkNotNull(managedVertex, "managedVertex is null");
@@ -209,13 +209,12 @@ public class VertexManager {
     this.pluginDesc = pluginDesc;
     this.managedVertex = managedVertex;
     this.appContext = appContext;
-    this.cachedRootInputEventMap = new ConcurrentHashMap<String, List<TezEvent>>();
   }
-
+  
   public VertexManagerPlugin getPlugin() {
     return plugin;
   }
-
+  
   public void initialize() {
     pluginContext = new VertexManagerPluginContextImpl();
     if (pluginDesc != null) {
@@ -247,7 +246,7 @@ public class VertexManager {
 
   public void onSourceTaskCompleted(TezTaskID tezTaskId) {
     Integer taskId = new Integer(tezTaskId.getId());
-    String vertexName =
+    String vertexName = 
         appContext.getCurrentDAG().getVertex(tezTaskId.getVertexID()).getName();
     plugin.onSourceTaskCompleted(vertexName, taskId);
   }
@@ -256,9 +255,8 @@ public class VertexManager {
     plugin.onVertexManagerEventReceived(vmEvent);
   }
 
-  public List<TezEvent> onRootVertexInitialized(String inputName,
+  public void onRootVertexInitialized(String inputName, 
       InputDescriptor inputDescriptor, List<Event> events) {
     plugin.onRootVertexInitialized(inputName, inputDescriptor, events);
-    return cachedRootInputEventMap.get(inputName);
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/d6589d3a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
deleted file mode 100644
index b3e66bc..0000000
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.app.dag.impl;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
-import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.dag.Vertex;
-import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.events.InputDataInformationEvent;
-import org.apache.tez.runtime.api.impl.TezEvent;
-import org.junit.Test;
-
-public class TestVertexManager {
-
-  @Test
-  public void testOnRootVertexInitialized() {
-    Vertex mockVertex = mock(Vertex.class, RETURNS_DEEP_STUBS);
-    AppContext mockAppContext = mock(AppContext.class, RETURNS_DEEP_STUBS);
-    doReturn("vertex1").when(mockVertex).getName();
-    when(
-        mockAppContext.getCurrentDAG().getVertex(any(String.class))
-            .getTotalTasks()).thenReturn(1);
-
-    VertexManager vm =
-        new VertexManager(
-            VertexManagerPluginDescriptor.create(RootInputVertexManager.class
-                .getName()), mockVertex, mockAppContext);
-    vm.initialize();
-    InputDescriptor id1 = mock(InputDescriptor.class);
-    List<Event> events1 = new LinkedList<Event>();
-    InputDataInformationEvent diEvent1 =
-        InputDataInformationEvent.createWithSerializedPayload(0, null);
-    events1.add(diEvent1);
-    List<TezEvent> tezEvents1 =
-        vm.onRootVertexInitialized("input1", id1, events1);
-    assertEquals(1, tezEvents1.size());
-    assertEquals(diEvent1, tezEvents1.get(0).getEvent());
-
-    InputDescriptor id2 = mock(InputDescriptor.class);
-    List<Event> events2 = new LinkedList<Event>();
-    InputDataInformationEvent diEvent2 =
-        InputDataInformationEvent.createWithSerializedPayload(0, null);
-    events2.add(diEvent2);
-    List<TezEvent> tezEvents2 =
-        vm.onRootVertexInitialized("input1", id2, events2);
-    assertEquals(tezEvents2.size(), 1);
-    assertEquals(diEvent2, tezEvents2.get(0).getEvent());
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/d6589d3a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
index 7676313..c9acdc2 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
@@ -24,30 +24,21 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.client.TezClient;
-import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.DataSourceDescriptor;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.InputInitializerDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.DAGStatus.State;
-import org.apache.tez.dag.app.RecoveryParser;
-import org.apache.tez.dag.history.HistoryEvent;
-import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.history.events.VertexDataMovementEventsGeneratedEvent;
-import org.apache.tez.dag.history.events.VertexInitializedEvent;
 import org.apache.tez.test.dag.MultiAttemptDAG;
 import org.apache.tez.test.dag.MultiAttemptDAG.FailingInputInitializer;
 import org.apache.tez.test.dag.MultiAttemptDAG.NoOpInput;
-import org.apache.tez.test.dag.MultiAttemptDAG.TestRootInputInitializer;
 import org.apache.tez.test.dag.SimpleVTestDAG;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -57,7 +48,6 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.IOException;
-import java.util.List;
 import java.util.Random;
 
 public class TestDAGRecovery {
@@ -71,7 +61,6 @@ public class TestDAGRecovery {
   private static MiniDFSCluster dfsCluster = null;
   private static TezClient tezSession = null;
   private static FileSystem remoteFs = null;
-  private static TezConfiguration tezConf = null;
 
   @BeforeClass
   public static void beforeClass() throws Exception {
@@ -131,7 +120,7 @@ public class TestDAGRecovery {
         .valueOf(new Random().nextInt(100000))));
     TezClientUtils.ensureStagingDirExists(conf, remoteStagingDir);
 
-    tezConf = new TezConfiguration(miniTezCluster.getConfig());
+    TezConfiguration tezConf = new TezConfiguration(miniTezCluster.getConfig());
     tezConf.setInt(TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS, 0);
     tezConf.set(TezConfiguration.TEZ_AM_LOG_LEVEL, "DEBUG");
     tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
@@ -141,7 +130,6 @@ public class TestDAGRecovery {
     tezConf.setInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, 500);
     tezConf.set(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS, " -Xmx256m");
     tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, true);
-    tezConf.set(TezConfiguration.TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE, "false");
 
     tezSession = TezClient.create("TestDAGRecovery", tezConf);
     tezSession.start();
@@ -177,64 +165,13 @@ public class TestDAGRecovery {
     Assert.assertEquals(finalState, dagStatus.getState());
   }
 
-  private void verifyRecoveryLog() throws IOException{
-    ApplicationId appId = tezSession.getAppMasterApplicationId();
-    Path tezSystemStagingDir = TezCommonUtils.getTezSystemStagingPath(tezConf, appId.toString());
-    Path recoveryDataDir = TezCommonUtils.getRecoveryPath(tezSystemStagingDir, tezConf);
-
-    FileSystem fs = tezSystemStagingDir.getFileSystem(tezConf);
-    for (int i=1; i<=3; ++i) {
-      Path currentAttemptRecoveryDataDir = TezCommonUtils.getAttemptRecoveryPath(recoveryDataDir,i);
-      Path recoveryFilePath = new Path(currentAttemptRecoveryDataDir,
-      appId.toString().replace("application", "dag") + "_1" + TezConstants.DAG_RECOVERY_RECOVER_FILE_SUFFIX);
-      List<HistoryEvent> historyEvents = RecoveryParser.parseDAGRecoveryFile(
-          fs.open(recoveryFilePath));
-
-      int inputInfoEventIndex = -1;
-      int vertexInitedEventIndex = -1;
-      for (int j=0;j<historyEvents.size(); ++j) {
-        HistoryEvent historyEvent = historyEvents.get(j);
-        LOG.info("Parsed event from recovery stream"
-            + ", eventType=" + historyEvent.getEventType()
-            + ", event=" + historyEvent);
-        if (historyEvent.getEventType() ==  HistoryEventType.VERTEX_DATA_MOVEMENT_EVENTS_GENERATED) {
-          VertexDataMovementEventsGeneratedEvent dmEvent =
-              (VertexDataMovementEventsGeneratedEvent)historyEvent;
-          // TODO do not need to check whether it is -1 after Tez-1521 is resolved
-          if (dmEvent.getVertexID().getId() == 0 && inputInfoEventIndex == -1) {
-            inputInfoEventIndex = j;
-          }
-        }
-        if (historyEvent.getEventType() == HistoryEventType.VERTEX_INITIALIZED) {
-          VertexInitializedEvent vInitedEvent = (VertexInitializedEvent) historyEvent;
-          if (vInitedEvent.getVertexID().getId() == 0) {
-            vertexInitedEventIndex = j;
-          }
-        }
-      }
-      // v1's init events must be logged before its VertexInitializedEvent (Tez-1345)
-      Assert.assertTrue("can not find VERTEX_DATA_MOVEMENT_EVENTS_GENERATED for v1", inputInfoEventIndex != -1);
-      Assert.assertTrue("can not find VERTEX_INITIALIZED for v1", vertexInitedEventIndex != -1);
-      Assert.assertTrue("VERTEX_DATA_MOVEMENT_EVENTS_GENERATED is logged before VERTEX_INITIALIZED for v1",
-          inputInfoEventIndex < vertexInitedEventIndex);
-    }
-  }
-
   @Test(timeout=120000)
   public void testBasicRecovery() throws Exception {
     DAG dag = MultiAttemptDAG.createDAG("TestBasicRecovery", null);
-    // add input to v1 to make sure that there will be init events for v1 (TEZ-1345)
-    DataSourceDescriptor dataSource =
-        DataSourceDescriptor.create(InputDescriptor.create(NoOpInput.class.getName()),
-           InputInitializerDescriptor.create(TestRootInputInitializer.class.getName()), null);
-    dag.getVertex("v1").addDataSource("Input", dataSource);
-
     runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
 
-    verifyRecoveryLog();
-
     // it should fail if submitting same dags in recovery mode (TEZ-1064)
-    try {
+    try{
       DAGClient dagClient = tezSession.submitDAG(dag);
       Assert.fail("Expected DAG submit to fail on duplicate dag name");
     } catch (TezException e) {

http://git-wip-us.apache.org/repos/asf/tez/blob/d6589d3a/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java b/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
index 58b9413..7fc9ad7 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
@@ -22,7 +22,6 @@ import com.google.common.collect.Lists;
 import com.google.common.primitives.Ints;
 
 import java.nio.ByteBuffer;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -54,15 +53,12 @@ import org.apache.tez.runtime.api.OutputContext;
 import org.apache.tez.runtime.api.InputInitializer;
 import org.apache.tez.runtime.api.InputInitializerContext;
 import org.apache.tez.runtime.api.Writer;
-import org.apache.tez.runtime.api.events.InputDataInformationEvent;
 import org.apache.tez.runtime.api.events.InputInitializerEvent;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
-import org.apache.tez.runtime.api.impl.TezEvent;
 import org.apache.tez.test.TestInput;
 import org.apache.tez.test.TestOutput;
 import org.apache.tez.test.TestProcessor;
 
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -152,15 +148,8 @@ public class MultiAttemptDAG {
     }
 
     @Override
-    public void onRootVertexInitialized(String inputName,
-        InputDescriptor inputDescriptor, List<Event> events) {
-      List<InputDataInformationEvent> inputInfoEvents = new ArrayList<InputDataInformationEvent>();
-      for (Event event: events) {
-        if (event instanceof InputDataInformationEvent) {
-          inputInfoEvents.add((InputDataInformationEvent)event);
-        }
-      }
-      getContext().addRootInputEvents(inputName, inputInfoEvents);
+    public void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor, List<Event> events) {
+      // Do nothing
     }
   }
 
@@ -225,26 +214,6 @@ public class MultiAttemptDAG {
     }
   }
 
-  public static class TestRootInputInitializer extends InputInitializer {
-
-    public TestRootInputInitializer(InputInitializerContext initializerContext) {
-      super(initializerContext);
-    }
-
-    @Override
-    public List<Event> initialize() throws Exception {
-      List<Event> events = new ArrayList<Event>();
-      events.add(InputDataInformationEvent.createWithSerializedPayload(0, ByteBuffer.allocate(0)));
-      return events;
-    }
-
-    @Override
-    public void handleInputInitializerEvent(List<InputInitializerEvent> events)
-        throws Exception {
-      throw new UnsupportedOperationException("Not supported");
-    }
-  }
-
   public static class FailingInputInitializer extends InputInitializer {
 
     public FailingInputInitializer(InputInitializerContext initializerContext) {