You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2014/09/18 21:49:52 UTC
[07/25] git commit: TEZ-1345. Add checks to guarantee all init events
are written to recovery to consider vertex initialized. (Jeff Zhang via
hitesh)
TEZ-1345. Add checks to guarantee all init events are written to recovery to consider vertex initialized. (Jeff Zhang via hitesh)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/fb05aace
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/fb05aace
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/fb05aace
Branch: refs/heads/TEZ-8
Commit: fb05aace5062dc441da80f5ac8d4e322ab2a15ef
Parents: 16a0f57
Author: Hitesh Shah <hi...@apache.org>
Authored: Thu Sep 11 22:29:29 2014 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Thu Sep 11 22:29:29 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 7 +-
.../tez/dag/app/dag/impl/VertexManager.java | 28 ++++----
.../tez/dag/app/dag/impl/TestVertexManager.java | 76 ++++++++++++++++++++
.../org/apache/tez/test/TestDAGRecovery.java | 67 ++++++++++++++++-
.../apache/tez/test/dag/MultiAttemptDAG.java | 35 ++++++++-
6 files changed, 195 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/fb05aace/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 519aaa6..f29d48d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -11,6 +11,7 @@ ALL CHANGES:
TEZ-1559. Add system tests for AM recovery.
TEZ-850. Recovery unit tests.
TEZ-853. Support counters recovery.
+ TEZ-1345. Add checks to guarantee all init events are written to recovery to consider vertex initialized.
Release 0.5.1: Unreleased
http://git-wip-us.apache.org/repos/asf/tez/blob/fb05aace/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 31240cb..6437e5b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -156,7 +156,6 @@ import org.apache.tez.runtime.api.impl.GroupInputSpec;
import org.apache.tez.runtime.api.impl.InputSpec;
import org.apache.tez.runtime.api.impl.OutputSpec;
import org.apache.tez.runtime.api.impl.TezEvent;
-
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultiset;
@@ -2782,10 +2781,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
VertexEventRootInputInitialized liInitEvent = (VertexEventRootInputInitialized) event;
VertexState state = vertex.getState();
if (state == VertexState.INITIALIZING) {
- vertex.vertexManager.onRootVertexInitialized(
+ List<TezEvent> inputInfoEvents =
+ vertex.vertexManager.onRootVertexInitialized(
liInitEvent.getInputName(),
vertex.getAdditionalInputs().get(liInitEvent.getInputName())
.getIODescriptor(), liInitEvent.getEvents());
+ if (inputInfoEvents != null && !inputInfoEvents.isEmpty()) {
+ ROUTE_EVENT_TRANSITION.transition(vertex, new VertexEventRouteEvent(vertex.vertexId, inputInfoEvents));
+ }
}
vertex.numInitializedInputs++;
http://git-wip-us.apache.org/repos/asf/tez/blob/fb05aace/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index 177b946..c2ff660 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -24,6 +24,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;
@@ -44,7 +45,6 @@ import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.Vertex;
-import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.runtime.api.Event;
@@ -68,10 +68,11 @@ public class VertexManager {
VertexManagerPluginContextImpl pluginContext;
UserPayload payload = null;
AppContext appContext;
-
+ ConcurrentHashMap<String, List<TezEvent>> cachedRootInputEventMap;
+
class VertexManagerPluginContextImpl implements VertexManagerPluginContext {
// TODO Add functionality to allow VertexManagers to send VertexManagerEvents
-
+
private EventMetaData rootEventSourceMetadata = new EventMetaData(EventProducerConsumerType.INPUT,
managedVertex.getName(), "NULL_VERTEX", null);
private Map<String, EventMetaData> destinationEventMetadataMap = Maps.newHashMap();
@@ -80,7 +81,7 @@ public class VertexManager {
public Map<String, EdgeProperty> getInputVertexEdgeProperties() {
// TODO Something similar for Initial Inputs - payload etc visible
Map<Vertex, Edge> inputs = managedVertex.getInputVertices();
- Map<String, EdgeProperty> vertexEdgeMap =
+ Map<String, EdgeProperty> vertexEdgeMap =
Maps.newHashMapWithExpectedSize(inputs.size());
for (Map.Entry<Vertex, Edge> entry : inputs.entrySet()) {
vertexEdgeMap.put(entry.getKey().getName(), entry.getValue().getEdgeProperty());
@@ -115,7 +116,7 @@ public class VertexManager {
@Override
public Set<String> getVertexInputNames() {
Set<String> inputNames = null;
- Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>
+ Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>
inputs = managedVertex.getAdditionalInputs();
if (inputs != null) {
inputNames = inputs.keySet();
@@ -128,7 +129,6 @@ public class VertexManager {
return payload;
}
- @SuppressWarnings("unchecked")
@Override
public void addRootInputEvents(final String inputName,
Collection<InputDataInformationEvent> events) {
@@ -142,8 +142,8 @@ public class VertexManager {
return tezEvent;
}
});
- appContext.getEventHandler().handle(
- new VertexEventRouteEvent(managedVertex.getVertexId(), Lists.newArrayList(tezEvents)));
+
+ cachedRootInputEventMap.put(inputName,Lists.newArrayList(tezEvents));
// Recovery handling is taken care of by the Vertex.
}
@@ -201,7 +201,7 @@ public class VertexManager {
}
}
- public VertexManager(VertexManagerPluginDescriptor pluginDesc,
+ public VertexManager(VertexManagerPluginDescriptor pluginDesc,
Vertex managedVertex, AppContext appContext) {
checkNotNull(pluginDesc, "pluginDesc is null");
checkNotNull(managedVertex, "managedVertex is null");
@@ -209,12 +209,13 @@ public class VertexManager {
this.pluginDesc = pluginDesc;
this.managedVertex = managedVertex;
this.appContext = appContext;
+ this.cachedRootInputEventMap = new ConcurrentHashMap<String, List<TezEvent>>();
}
-
+
public VertexManagerPlugin getPlugin() {
return plugin;
}
-
+
public void initialize() {
pluginContext = new VertexManagerPluginContextImpl();
if (pluginDesc != null) {
@@ -246,7 +247,7 @@ public class VertexManager {
public void onSourceTaskCompleted(TezTaskID tezTaskId) {
Integer taskId = new Integer(tezTaskId.getId());
- String vertexName =
+ String vertexName =
appContext.getCurrentDAG().getVertex(tezTaskId.getVertexID()).getName();
plugin.onSourceTaskCompleted(vertexName, taskId);
}
@@ -255,8 +256,9 @@ public class VertexManager {
plugin.onVertexManagerEventReceived(vmEvent);
}
- public void onRootVertexInitialized(String inputName,
+ public List<TezEvent> onRootVertexInitialized(String inputName,
InputDescriptor inputDescriptor, List<Event> events) {
plugin.onRootVertexInitialized(inputName, inputDescriptor, events);
+ return cachedRootInputEventMap.get(inputName);
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/fb05aace/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
new file mode 100644
index 0000000..b3e66bc
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.events.InputDataInformationEvent;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.junit.Test;
+
+public class TestVertexManager {
+
+ @Test
+ public void testOnRootVertexInitialized() {
+ Vertex mockVertex = mock(Vertex.class, RETURNS_DEEP_STUBS);
+ AppContext mockAppContext = mock(AppContext.class, RETURNS_DEEP_STUBS);
+ doReturn("vertex1").when(mockVertex).getName();
+ when(
+ mockAppContext.getCurrentDAG().getVertex(any(String.class))
+ .getTotalTasks()).thenReturn(1);
+
+ VertexManager vm =
+ new VertexManager(
+ VertexManagerPluginDescriptor.create(RootInputVertexManager.class
+ .getName()), mockVertex, mockAppContext);
+ vm.initialize();
+ InputDescriptor id1 = mock(InputDescriptor.class);
+ List<Event> events1 = new LinkedList<Event>();
+ InputDataInformationEvent diEvent1 =
+ InputDataInformationEvent.createWithSerializedPayload(0, null);
+ events1.add(diEvent1);
+ List<TezEvent> tezEvents1 =
+ vm.onRootVertexInitialized("input1", id1, events1);
+ assertEquals(1, tezEvents1.size());
+ assertEquals(diEvent1, tezEvents1.get(0).getEvent());
+
+ InputDescriptor id2 = mock(InputDescriptor.class);
+ List<Event> events2 = new LinkedList<Event>();
+ InputDataInformationEvent diEvent2 =
+ InputDataInformationEvent.createWithSerializedPayload(0, null);
+ events2.add(diEvent2);
+ List<TezEvent> tezEvents2 =
+ vm.onRootVertexInitialized("input1", id2, events2);
+ assertEquals(tezEvents2.size(), 1);
+ assertEquals(diEvent2, tezEvents2.get(0).getEvent());
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/fb05aace/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
index c9acdc2..7676313 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
@@ -24,21 +24,30 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.tez.client.TezClientUtils;
import org.apache.tez.client.TezClient;
+import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.DataSourceDescriptor;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.InputInitializerDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.DAGStatus.State;
+import org.apache.tez.dag.app.RecoveryParser;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.events.VertexDataMovementEventsGeneratedEvent;
+import org.apache.tez.dag.history.events.VertexInitializedEvent;
import org.apache.tez.test.dag.MultiAttemptDAG;
import org.apache.tez.test.dag.MultiAttemptDAG.FailingInputInitializer;
import org.apache.tez.test.dag.MultiAttemptDAG.NoOpInput;
+import org.apache.tez.test.dag.MultiAttemptDAG.TestRootInputInitializer;
import org.apache.tez.test.dag.SimpleVTestDAG;
import org.junit.After;
import org.junit.AfterClass;
@@ -48,6 +57,7 @@ import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
+import java.util.List;
import java.util.Random;
public class TestDAGRecovery {
@@ -61,6 +71,7 @@ public class TestDAGRecovery {
private static MiniDFSCluster dfsCluster = null;
private static TezClient tezSession = null;
private static FileSystem remoteFs = null;
+ private static TezConfiguration tezConf = null;
@BeforeClass
public static void beforeClass() throws Exception {
@@ -120,7 +131,7 @@ public class TestDAGRecovery {
.valueOf(new Random().nextInt(100000))));
TezClientUtils.ensureStagingDirExists(conf, remoteStagingDir);
- TezConfiguration tezConf = new TezConfiguration(miniTezCluster.getConfig());
+ tezConf = new TezConfiguration(miniTezCluster.getConfig());
tezConf.setInt(TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS, 0);
tezConf.set(TezConfiguration.TEZ_AM_LOG_LEVEL, "DEBUG");
tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
@@ -130,6 +141,7 @@ public class TestDAGRecovery {
tezConf.setInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, 500);
tezConf.set(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS, " -Xmx256m");
tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, true);
+ tezConf.set(TezConfiguration.TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE, "false");
tezSession = TezClient.create("TestDAGRecovery", tezConf);
tezSession.start();
@@ -165,13 +177,64 @@ public class TestDAGRecovery {
Assert.assertEquals(finalState, dagStatus.getState());
}
+ private void verifyRecoveryLog() throws IOException{
+ ApplicationId appId = tezSession.getAppMasterApplicationId();
+ Path tezSystemStagingDir = TezCommonUtils.getTezSystemStagingPath(tezConf, appId.toString());
+ Path recoveryDataDir = TezCommonUtils.getRecoveryPath(tezSystemStagingDir, tezConf);
+
+ FileSystem fs = tezSystemStagingDir.getFileSystem(tezConf);
+ for (int i=1; i<=3; ++i) {
+ Path currentAttemptRecoveryDataDir = TezCommonUtils.getAttemptRecoveryPath(recoveryDataDir,i);
+ Path recoveryFilePath = new Path(currentAttemptRecoveryDataDir,
+ appId.toString().replace("application", "dag") + "_1" + TezConstants.DAG_RECOVERY_RECOVER_FILE_SUFFIX);
+ List<HistoryEvent> historyEvents = RecoveryParser.parseDAGRecoveryFile(
+ fs.open(recoveryFilePath));
+
+ int inputInfoEventIndex = -1;
+ int vertexInitedEventIndex = -1;
+ for (int j=0;j<historyEvents.size(); ++j) {
+ HistoryEvent historyEvent = historyEvents.get(j);
+ LOG.info("Parsed event from recovery stream"
+ + ", eventType=" + historyEvent.getEventType()
+ + ", event=" + historyEvent);
+ if (historyEvent.getEventType() == HistoryEventType.VERTEX_DATA_MOVEMENT_EVENTS_GENERATED) {
+ VertexDataMovementEventsGeneratedEvent dmEvent =
+ (VertexDataMovementEventsGeneratedEvent)historyEvent;
+ // TODO do not need to check whether it is -1 after Tez-1521 is resolved
+ if (dmEvent.getVertexID().getId() == 0 && inputInfoEventIndex == -1) {
+ inputInfoEventIndex = j;
+ }
+ }
+ if (historyEvent.getEventType() == HistoryEventType.VERTEX_INITIALIZED) {
+ VertexInitializedEvent vInitedEvent = (VertexInitializedEvent) historyEvent;
+ if (vInitedEvent.getVertexID().getId() == 0) {
+ vertexInitedEventIndex = j;
+ }
+ }
+ }
+ // v1's init events must be logged before its VertexInitializedEvent (Tez-1345)
+ Assert.assertTrue("can not find VERTEX_DATA_MOVEMENT_EVENTS_GENERATED for v1", inputInfoEventIndex != -1);
+ Assert.assertTrue("can not find VERTEX_INITIALIZED for v1", vertexInitedEventIndex != -1);
+ Assert.assertTrue("VERTEX_DATA_MOVEMENT_EVENTS_GENERATED is logged before VERTEX_INITIALIZED for v1",
+ inputInfoEventIndex < vertexInitedEventIndex);
+ }
+ }
+
@Test(timeout=120000)
public void testBasicRecovery() throws Exception {
DAG dag = MultiAttemptDAG.createDAG("TestBasicRecovery", null);
+ // add input to v1 to make sure that there will be init events for v1 (TEZ-1345)
+ DataSourceDescriptor dataSource =
+ DataSourceDescriptor.create(InputDescriptor.create(NoOpInput.class.getName()),
+ InputInitializerDescriptor.create(TestRootInputInitializer.class.getName()), null);
+ dag.getVertex("v1").addDataSource("Input", dataSource);
+
runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+ verifyRecoveryLog();
+
// it should fail if submitting same dags in recovery mode (TEZ-1064)
- try{
+ try {
DAGClient dagClient = tezSession.submitDAG(dag);
Assert.fail("Expected DAG submit to fail on duplicate dag name");
} catch (TezException e) {
http://git-wip-us.apache.org/repos/asf/tez/blob/fb05aace/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java b/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
index 7fc9ad7..58b9413 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
@@ -22,6 +22,7 @@ import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import java.nio.ByteBuffer;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -53,12 +54,15 @@ import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.api.InputInitializer;
import org.apache.tez.runtime.api.InputInitializerContext;
import org.apache.tez.runtime.api.Writer;
+import org.apache.tez.runtime.api.events.InputDataInformationEvent;
import org.apache.tez.runtime.api.events.InputInitializerEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.test.TestInput;
import org.apache.tez.test.TestOutput;
import org.apache.tez.test.TestProcessor;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -148,8 +152,15 @@ public class MultiAttemptDAG {
}
@Override
- public void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor, List<Event> events) {
- // Do nothing
+ public void onRootVertexInitialized(String inputName,
+ InputDescriptor inputDescriptor, List<Event> events) {
+ List<InputDataInformationEvent> inputInfoEvents = new ArrayList<InputDataInformationEvent>();
+ for (Event event: events) {
+ if (event instanceof InputDataInformationEvent) {
+ inputInfoEvents.add((InputDataInformationEvent)event);
+ }
+ }
+ getContext().addRootInputEvents(inputName, inputInfoEvents);
}
}
@@ -214,6 +225,26 @@ public class MultiAttemptDAG {
}
}
+ public static class TestRootInputInitializer extends InputInitializer {
+
+ public TestRootInputInitializer(InputInitializerContext initializerContext) {
+ super(initializerContext);
+ }
+
+ @Override
+ public List<Event> initialize() throws Exception {
+ List<Event> events = new ArrayList<Event>();
+ events.add(InputDataInformationEvent.createWithSerializedPayload(0, ByteBuffer.allocate(0)));
+ return events;
+ }
+
+ @Override
+ public void handleInputInitializerEvent(List<InputInitializerEvent> events)
+ throws Exception {
+ throw new UnsupportedOperationException("Not supported");
+ }
+ }
+
public static class FailingInputInitializer extends InputInitializer {
public FailingInputInitializer(InputInitializerContext initializerContext) {