You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2021/01/27 23:51:28 UTC
[tez] branch master updated: TEZ-3985: Correctness: Throw a clear
exception for DMEs sent during cleanup (Contributed by Jaume M,
reviewed by Rajesh Balamohan, Ashutosh Chauhan)
This is an automated email from the ASF dual-hosted git repository.
rbalamohan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/master by this push:
new 04597c7 TEZ-3985: Correctness: Throw a clear exception for DMEs sent during cleanup (Contributed by Jaume M, reviewed by Rajesh Balamohan, Ashutosh Chauhan)
04597c7 is described below
commit 04597c7b591917bbe995a9e182c3be60e33816c9
Author: Rajesh Balamohan <rb...@apache.org>
AuthorDate: Thu Jan 28 05:20:25 2021 +0530
TEZ-3985: Correctness: Throw a clear exception for DMEs sent during cleanup (Contributed by Jaume M, reviewed by Rajesh Balamohan, Ashutosh Chauhan)
---
.../org/apache/tez/runtime/api/OutputContext.java | 10 +++
.../tez/runtime/api/impl/TezOutputContextImpl.java | 30 ++++++-
.../tez/runtime/internals/api/TezTrapEvent.java | 52 ++++++++++++
.../runtime/internals/api/TezTrapEventType.java | 29 +++++++
.../tez/runtime/task/TaskRunner2Callable.java | 12 ++-
.../apache/tez/runtime/task/TezTaskRunner2.java | 3 +-
.../tez/runtime/task/TezTrapEventHandler.java | 92 ++++++++++++++++++++
.../runtime/TestLogicalIOProcessorRuntimeTask.java | 97 +++++++++++++++++++++-
8 files changed, 318 insertions(+), 7 deletions(-)
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputContext.java
index 882eb4b..33fe772 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputContext.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputContext.java
@@ -19,6 +19,7 @@
package org.apache.tez.runtime.api;
import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.yarn.event.EventHandler;
/**
* Context handle for the Output to initialize itself.
@@ -48,4 +49,13 @@ public interface OutputContext extends TaskContext {
*/
public OutputStatisticsReporter getStatisticsReporter();
+ /**
+ * Notify the context that at this point no more events should be sent.
+ * This is used as a safety measure to prevent events being sent after close
+ * or in cleanup. After this is called events being queued to be sent to the
+ * AM will instead be passed to the event handler.
+ * @param eventHandler should handle the events after the call.
+ */
+ void trapEvents(EventHandler eventHandler);
+
}
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
index ec8280a..20ec062 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
@@ -29,7 +29,9 @@ import javax.annotation.Nullable;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.common.TezExecutors;
+import org.apache.tez.runtime.internals.api.TezTrapEvent;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
@@ -47,17 +49,28 @@ import org.apache.tez.runtime.common.resources.MemoryDistributor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+@SuppressWarnings("unchecked")
public class TezOutputContextImpl extends TezTaskContextImpl
implements OutputContext {
private static final Logger LOG = LoggerFactory.getLogger(TezOutputContextImpl.class);
private volatile UserPayload userPayload;
+
+ /**
+ * Holds whether we can accept more events to send to the AM.
+ */
+ private volatile boolean trapEvents;
private final String destinationVertexName;
private final EventMetaData sourceInfo;
private final int outputIndex;
private final OutputStatisticsReporterImpl statsReporter;
+ /**
+ * Handler for the events after the trap flag is set.
+ */
+ private EventHandler<TezTrapEvent> trapEventHandler;
+
class OutputStatisticsReporterImpl implements OutputStatisticsReporter {
@Override
@@ -71,7 +84,7 @@ public class TezOutputContextImpl extends TezTaskContextImpl
public void reportItemsProcessed(long items) {
// this is a concurrent map. Plus we are not adding/deleting entries
runtimeTask.getTaskStatistics().getIOStatistics().get(destinationVertexName)
- .setItemsProcessed(items);;
+ .setItemsProcessed(items);
}
}
@@ -124,7 +137,11 @@ public class TezOutputContextImpl extends TezTaskContextImpl
TezEvent tEvt = new TezEvent(e, sourceInfo);
tezEvents.add(tEvt);
}
- tezUmbilical.addEvents(tezEvents);
+ if (trapEvents) {
+ trapEventHandler.handle(new TezTrapEvent(tezEvents));
+ } else {
+ tezUmbilical.addEvents(tezEvents);
+ }
}
@Override
@@ -163,6 +180,15 @@ public class TezOutputContextImpl extends TezTaskContextImpl
return statsReporter;
}
+ /**
+ * This will monitor some of the events that will be sent.
+ */
+ @Override
+ public final void trapEvents(final EventHandler eventHandler) {
+ trapEvents = true;
+ this.trapEventHandler = eventHandler;
+ }
+
@Override
public void close() throws IOException {
super.close();
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/api/TezTrapEvent.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/api/TezTrapEvent.java
new file mode 100644
index 0000000..8c63b74
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/api/TezTrapEvent.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.internals.api;
+
+import org.apache.tez.common.TezAbstractEvent;
+import org.apache.tez.runtime.api.impl.TezEvent;
+
+import java.util.List;
+
+import static org.apache.tez.runtime.internals.api.TezTrapEventType.TRAP_EVENT_TYPE;
+
+/**
+ * Event sent when no more events should be sent to the AM.
+ */
+public class TezTrapEvent extends TezAbstractEvent<TezTrapEventType> {
+ /**
+ * Events that were reported.
+ */
+ private final List<TezEvent> tezEvents;
+
+ /**
+ * Create a tez trap event.
+ * @param events events tried to be sent to the AM.
+ */
+ public TezTrapEvent(final List<TezEvent> events) {
+ super(TRAP_EVENT_TYPE);
+ this.tezEvents = events;
+ }
+
+ /**
+ * @return events.
+ */
+ public final List<TezEvent> getTezEvents() {
+ return tezEvents;
+ }
+}
\ No newline at end of file
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/api/TezTrapEventType.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/api/TezTrapEventType.java
new file mode 100644
index 0000000..89cb78e
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/api/TezTrapEventType.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.internals.api;
+
+/**
+ * Enum type with only one value representing this event.
+ */
+public enum TezTrapEventType {
+ /**
+ * Single value for this event type.
+ */
+ TRAP_EVENT_TYPE
+}
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java
index 0e6dfda..810a806 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java
@@ -24,6 +24,7 @@ import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
+import org.apache.tez.runtime.api.impl.TezUmbilical;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,10 +47,16 @@ public class TaskRunner2Callable extends CallableWithNdc<TaskRunner2Callable.Tas
private volatile Thread ownThread;
+ /**
+ * Protocol to send the events.
+ */
+ private final TezUmbilical tezUmbilical;
+
public TaskRunner2Callable(LogicalIOProcessorRuntimeTask task,
- UserGroupInformation ugi) {
+ final UserGroupInformation ugi, final TezUmbilical umbilical) {
this.task = task;
this.ugi = ugi;
+ this.tezUmbilical = umbilical;
}
@Override
@@ -109,6 +116,9 @@ public class TaskRunner2Callable extends CallableWithNdc<TaskRunner2Callable.Tas
// For a successful task, however, this should be almost no delay since close has already happened.
maybeFixInterruptStatus();
LOG.info("Cleaning up task {}, stopRequested={}", task.getTaskAttemptID(), stopRequested.get());
+ task.getOutputContexts().forEach(outputContext
+ -> outputContext.trapEvents(new TezTrapEventHandler(outputContext,
+ this.tezUmbilical)));
task.cleanup();
}
}
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
index ae81769..bbf037b 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
@@ -177,7 +177,8 @@ public class TezTaskRunner2 {
// Safe to do this within a synchronized block because we're providing
// the handler on which the Reporter will communicate back. Assuming
// the register call doesn't end up hanging.
- taskRunnerCallable = new TaskRunner2Callable(task, ugi);
+ taskRunnerCallable = new TaskRunner2Callable(task, ugi,
+ umbilicalAndErrorHandler);
taskReporter.registerTask(task, umbilicalAndErrorHandler);
future = executor.submit(taskRunnerCallable);
}
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTrapEventHandler.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTrapEventHandler.java
new file mode 100644
index 0000000..b35dbb0
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTrapEventHandler.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.tez.runtime.task;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.runtime.api.impl.TezOutputContextImpl;
+import org.apache.tez.runtime.api.impl.TezUmbilical;
+import org.apache.tez.runtime.internals.api.TezTrapEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Class that handles the events after the trap has been activated. At
+ * this point no more events of some types shouldn't be sent and it's
+ * a bug to do so. If the events arrive here probably the task will be
+ * restarted because it has failed.
+ */
+public class TezTrapEventHandler implements EventHandler<TezTrapEvent> {
+ /**
+ * logger.
+ */
+ private static final Logger
+ LOG = LoggerFactory.getLogger(TezOutputContextImpl.class);
+
+ /**
+ * Output context that will report the events.
+ */
+ private final OutputContext outputContext;
+
+ /**
+ * Protocol to send the events.
+ */
+ private final TezUmbilical tezUmbilical;
+
+ /**
+ * @param output context that will report the events.
+ * @param umbilical used to send the events to the AM.
+ */
+ TezTrapEventHandler(final OutputContext output,
+ final TezUmbilical umbilical) {
+ this.outputContext = output;
+ this.tezUmbilical = umbilical;
+ }
+
+ /**
+ * Decide what to do with the events.
+ * @param tezTrapEvent event holding the tez events.
+ */
+ @Override
+ public final void handle(final TezTrapEvent tezTrapEvent) {
+ Preconditions.checkArgument(tezTrapEvent.getTezEvents() != null);
+ List<TezEvent> tezEvents = new ArrayList<TezEvent>(
+ tezTrapEvent.getTezEvents().size());
+ for (TezEvent tezEvent: tezTrapEvent.getTezEvents()) {
+ switch (tezEvent.getEventType()) {
+ case COMPOSITE_DATA_MOVEMENT_EVENT:
+ case DATA_MOVEMENT_EVENT:
+ String errorMsg = "Some events won't be sent to the AM because all"
+ + " the events should have been sent at this point. Most likely"
+ + " this would result in a bug. "
+ + " event:" + tezEvent.toString();
+ Throwable throwable = new Throwable(errorMsg);
+ LOG.error(errorMsg, throwable);
+ break;
+ default:
+ LOG.info("Event of type " + tezEvent.getEventType() + " will be sent"
+ + " to the AM after the task was closed ");
+ tezEvents.add(tezEvent);
+ }
+ }
+ tezUmbilical.addEvents(tezEvents);
+ }
+}
\ No newline at end of file
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
index 599f98f..ba9a66d 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
@@ -24,13 +24,18 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.tez.common.TezExecutors;
import org.apache.tez.common.TezSharedExecutor;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
@@ -41,12 +46,15 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.hadoop.shim.DefaultHadoopShim;
+import org.apache.tez.hadoop.shim.HadoopShim;
import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
import org.apache.tez.runtime.api.AbstractLogicalInput;
import org.apache.tez.runtime.api.AbstractLogicalOutput;
import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.ExecutionContext;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.ObjectRegistry;
import org.apache.tez.runtime.api.Reader;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.api.OutputContext;
@@ -57,13 +65,16 @@ import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
import org.apache.tez.runtime.api.impl.InputSpec;
import org.apache.tez.runtime.api.impl.OutputSpec;
import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.runtime.api.impl.TezUmbilical;
import org.apache.tez.runtime.common.resources.ScalingAllocator;
+import org.apache.tez.runtime.task.TaskRunner2Callable;
import org.junit.Test;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
+
import org.mockito.Mockito;
public class TestLogicalIOProcessorRuntimeTask {
@@ -149,6 +160,39 @@ public class TestLogicalIOProcessorRuntimeTask {
}
+ @Test
+ public void testEventsCantBeSentInCleanup() throws Exception {
+ TezDAGID dagId = createTezDagId();
+ TezVertexID vertexId = createTezVertexId(dagId);
+ Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<>();
+ Multimap<String, String> startedInputsMap = HashMultimap.create();
+ TezUmbilical umbilical = mock(TezUmbilical.class);
+ TezConfiguration tezConf = new TezConfiguration();
+ tezConf.set(TezConfiguration.TEZ_TASK_SCALE_MEMORY_ALLOCATOR_CLASS,
+ ScalingAllocator.class.getName());
+
+ TezTaskAttemptID taId1 = createTaskAttemptID(vertexId, 1);
+ TaskSpec task1 = createTaskSpec(taId1, "dag1", "vertex1", 30,
+ RunExceptionProcessor.class.getName(),
+ TestOutputWithEvents.class.getName());
+
+ TezSharedExecutor sharedExecutor = new TezSharedExecutor(tezConf);
+ LogicalIOProcessorRuntimeTask lio =
+ new CleanupLogicalIOProcessorRuntimeTask(task1, 0, tezConf, null,
+ umbilical, serviceConsumerMetadata, new HashMap<String, String>(),
+ startedInputsMap, null, "", new ExecutionContextImpl("localhost"),
+ Runtime.getRuntime().maxMemory(), true, new DefaultHadoopShim(),
+ sharedExecutor);
+
+ TaskRunner2Callable runner =
+ new TaskRunner2Callable(lio, UserGroupInformation.getCurrentUser(), umbilical);
+
+ runner.call();
+
+ // We verify that no events were sent
+ Mockito.verify(umbilical, Mockito.only()).addEvents(Collections.<TezEvent> emptyList());
+ }
+
/**
* We should expect no events being sent to the AM if an
* exception happens in the close method of the processor
@@ -167,7 +211,7 @@ public class TestLogicalIOProcessorRuntimeTask {
TezTaskAttemptID taId1 = createTaskAttemptID(vertexId, 1);
TaskSpec task1 = createTaskSpec(taId1, "dag1", "vertex1", 30,
- FaultyTestProcessor.class.getName(),
+ CloseExceptionProcessor.class.getName(),
TestOutputWithEvents.class.getName());
TezSharedExecutor sharedExecutor = new TezSharedExecutor(tezConf);
@@ -281,6 +325,31 @@ public class TestLogicalIOProcessorRuntimeTask {
return TezDAGID.getInstance("2000", 100, 1);
}
+ private static class CleanupLogicalIOProcessorRuntimeTask
+ extends LogicalIOProcessorRuntimeTask {
+ CleanupLogicalIOProcessorRuntimeTask(TaskSpec taskSpec,
+ int appAttemptNumber, Configuration tezConf, String[] localDirs,
+ TezUmbilical tezUmbilical,
+ Map<String, ByteBuffer> serviceConsumerMetadata,
+ Map<String, String> envMap, Multimap<String, String> startedInputsMap,
+ ObjectRegistry objectRegistry, String pid,
+ org.apache.tez.runtime.api.ExecutionContext ExecutionContext,
+ long memAvailable, boolean updateSysCounters, HadoopShim hadoopShim,
+ TezExecutors sharedExecutor) throws IOException {
+ super(taskSpec, appAttemptNumber, tezConf, localDirs, tezUmbilical,
+ serviceConsumerMetadata, envMap, startedInputsMap, objectRegistry,
+ pid, ExecutionContext, memAvailable, updateSysCounters, hadoopShim,
+ sharedExecutor);
+ }
+
+ @Override public void cleanup() throws InterruptedException {
+ getOutputContexts().forEach(context
+ -> context.sendEvents(Arrays.asList(
+ CompositeDataMovementEvent.create(0, 0, null)
+ )));
+ }
+ }
+
public static class TestProcessor extends AbstractLogicalIOProcessor {
public static volatile int runCount = 0;
@@ -310,8 +379,30 @@ public class TestLogicalIOProcessorRuntimeTask {
}
- public static class FaultyTestProcessor extends TestProcessor {
- public FaultyTestProcessor(ProcessorContext context) {
+ public static class RunExceptionProcessor
+ extends TestProcessor {
+
+ public RunExceptionProcessor(ProcessorContext context) {
+ super(context);
+ }
+
+ public void run(Map<String, LogicalInput> inputs,
+ Map<String, LogicalOutput> outputs)
+ throws Exception {
+ // This exception is thrown in purpose because we want to test this
+ throw new RuntimeException();
+ }
+
+ @Override
+ public void close() throws Exception {
+ // This exception is thrown because this method shouldn't be called
+ // if run has thrown an exception.
+ throw new RuntimeException();
+ }
+ }
+
+ public static class CloseExceptionProcessor extends TestProcessor {
+ public CloseExceptionProcessor(ProcessorContext context) {
super(context);
}