You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2019/03/04 21:38:49 UTC
[tez] branch branch-0.9 updated: TEZ-4028: Events not visible from
proto history logging for s3a filesystem until dag completes (Harish JP,
via Gopal V)
This is an automated email from the ASF dual-hosted git repository.
jeagles pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/branch-0.9 by this push:
new 3624413 TEZ-4028: Events not visible from proto history logging for s3a filesystem until dag completes (Harish JP, via Gopal V)
3624413 is described below
commit 3624413b50532caa08c547537c06d225d650aea1
Author: Harish JP <ha...@apache.org>
AuthorDate: Sun Jan 27 19:27:46 2019 -0800
TEZ-4028: Events not visible from proto history logging for s3a filesystem until dag completes (Harish JP, via Gopal V)
Signed-off-by: Gopal V <go...@apache.org>
(cherry picked from commit c78d3487f5170321f985c058b8804d25b6eae48c)
---
.../org/apache/tez/dag/api/TezConfiguration.java | 13 +-
.../logging/proto/ProtoHistoryLoggingService.java | 14 ++-
.../proto/TestProtoHistoryLoggingService.java | 135 ++++++++++++++++++---
3 files changed, 144 insertions(+), 18 deletions(-)
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index a4edb51..8360240 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -55,7 +55,7 @@ public class TezConfiguration extends Configuration {
private final static Logger LOG = LoggerFactory.getLogger(TezConfiguration.class);
- private static Map<String, Scope> PropertyScope = new HashMap<String, Scope>();
+ private static Map<String, Scope> PropertyScope = new HashMap<>();
static {
Configuration.addDeprecation("tez.am.counters.max.keys", TezConfiguration.TEZ_COUNTERS_MAX);
@@ -1526,6 +1526,17 @@ public class TezConfiguration extends Configuration {
public static final long TEZ_HISTORY_LOGGING_PROTO_SYNC_WINDOWN_SECS_DEFAULT = 60L;
/**
+ * Boolean value. Set this to true, if the underlying file system does not support flush (Ex: s3).
+ * The dag submitted, initialized and started events are written into a file and closed. The rest
+ * of the events are written into another file.
+ */
+ @ConfigurationScope(Scope.AM)
+ @ConfigurationProperty(type="boolean")
+ public static final String TEZ_HISTORY_LOGGING_PROTO_SPLIT_DAG_START =
+ TEZ_PREFIX + "history.logging.split-dag-start";
+ public static final boolean TEZ_HISTORY_LOGGING_PROTO_SPLIT_DAG_START_DEFAULT = false;
+
+ /**
* Long value. The amount of time in seconds to wait to ensure all events for a day is synced
* to disk. This should be maximum time variation b/w machines + maximum time to sync file
* content and metadata.
diff --git a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoHistoryLoggingService.java b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoHistoryLoggingService.java
index 206b1c1..d2e0b4d 100644
--- a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoHistoryLoggingService.java
+++ b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoHistoryLoggingService.java
@@ -45,6 +45,9 @@ import org.slf4j.LoggerFactory;
*/
public class ProtoHistoryLoggingService extends HistoryLoggingService {
private static final Logger LOG = LoggerFactory.getLogger(ProtoHistoryLoggingService.class);
+ // The file suffix used if we are writing start events and rest into different files.
+ static final String SPLIT_DAG_EVENTS_FILE_SUFFIX = "_1";
+
private final HistoryEventProtoConverter converter =
new HistoryEventProtoConverter();
private boolean loggingDisabled = false;
@@ -64,6 +67,7 @@ public class ProtoHistoryLoggingService extends HistoryLoggingService {
private String appEventsFile;
private long appLaunchedEventOffset;
+ private boolean splitDagStartEvents;
public ProtoHistoryLoggingService() {
super(ProtoHistoryLoggingService.class.getName());
@@ -75,6 +79,8 @@ public class ProtoHistoryLoggingService extends HistoryLoggingService {
setConfig(conf);
loggingDisabled = !conf.getBoolean(TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED,
TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED_DEFAULT);
+ splitDagStartEvents = conf.getBoolean(TezConfiguration.TEZ_HISTORY_LOGGING_PROTO_SPLIT_DAG_START,
+ TezConfiguration.TEZ_HISTORY_LOGGING_PROTO_SPLIT_DAG_START_DEFAULT);
LOG.info("Inited ProtoHistoryLoggingService");
}
@@ -171,6 +177,13 @@ public class ProtoHistoryLoggingService extends HistoryLoggingService {
dagEventsWriter.writeProto(converter.convert(historyEvent));
} else if (dagEventsWriter != null) {
dagEventsWriter.writeProto(converter.convert(historyEvent));
+ if (splitDagStartEvents && type == HistoryEventType.DAG_STARTED) {
+ // Close the file and write submitted event offset into manifest.
+ finishCurrentDag(null);
+ dagEventsWriter = loggers.getDagEventsLogger().getWriter(dagId.toString()
+ + "_" + appContext.getApplicationAttemptId().getAttemptId()
+ + SPLIT_DAG_EVENTS_FILE_SUFFIX);
+ }
}
}
}
@@ -214,7 +227,6 @@ public class ProtoHistoryLoggingService extends HistoryLoggingService {
// into another dag.
IOUtils.closeQuietly(dagEventsWriter);
dagEventsWriter = null;
- currentDagId = null;
dagSubmittedEventOffset = -1;
}
}
diff --git a/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestProtoHistoryLoggingService.java b/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestProtoHistoryLoggingService.java
index bc79b07..11d5513 100644
--- a/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestProtoHistoryLoggingService.java
+++ b/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestProtoHistoryLoggingService.java
@@ -25,6 +25,7 @@ import java.io.EOFException;
import java.io.IOException;
import java.time.LocalDate;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -45,6 +46,8 @@ import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.events.AppLaunchedEvent;
import org.apache.tez.dag.history.events.DAGFinishedEvent;
+import org.apache.tez.dag.history.events.DAGInitializedEvent;
+import org.apache.tez.dag.history.events.DAGStartedEvent;
import org.apache.tez.dag.history.events.DAGSubmittedEvent;
import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
import org.apache.tez.dag.history.events.TaskStartedEvent;
@@ -72,7 +75,7 @@ public class TestProtoHistoryLoggingService {
@Test
public void testService() throws Exception {
- ProtoHistoryLoggingService service = createService();
+ ProtoHistoryLoggingService service = createService(false);
service.start();
TezDAGID dagId = TezDAGID.getInstance(appId, 0);
List<HistoryEventProto> protos = new ArrayList<>();
@@ -87,19 +90,9 @@ public class TestProtoHistoryLoggingService {
// Verify dag events are logged.
DatePartitionedLogger<HistoryEventProto> dagLogger = loggers.getDagEventsLogger();
- Path dagFilePath = dagLogger.getPathForDate(LocalDate.ofEpochDay(0), dagId.toString() + "_" + 1);
+ Path dagFilePath = dagLogger.getPathForDate(LocalDate.ofEpochDay(0), dagId + "_1");
ProtoMessageReader<HistoryEventProto> reader = dagLogger.getReader(dagFilePath);
- HistoryEventProto evt = reader.readEvent();
- int ind = 1;
- while (evt != null) {
- Assert.assertEquals(protos.get(ind), evt);
- ind++;
- try {
- evt = reader.readEvent();
- } catch (EOFException e) {
- evt = null;
- }
- }
+ assertEventsRead(reader, protos, 1, protos.size());
reader.close();
// Verify app events are logged.
@@ -108,7 +101,7 @@ public class TestProtoHistoryLoggingService {
ProtoMessageReader<HistoryEventProto> appReader = appLogger.getReader(appFilePath);
long appOffset = appReader.getOffset();
Assert.assertEquals(protos.get(0), appReader.readEvent());
- reader.close();
+ appReader.close();
// Verify manifest events are logged.
DatePartitionedLogger<ManifestEntryProto> manifestLogger = loggers.getManifestEventsLogger();
@@ -125,7 +118,7 @@ public class TestProtoHistoryLoggingService {
// Verify offsets in manifest logger.
reader = dagLogger.getReader(new Path(manifest.getDagFilePath()));
reader.setOffset(manifest.getDagSubmittedEventOffset());
- evt = reader.readEvent();
+ HistoryEventProto evt = reader.readEvent();
Assert.assertNotNull(evt);
Assert.assertEquals(HistoryEventType.DAG_SUBMITTED.name(), evt.getEventType());
@@ -133,6 +126,7 @@ public class TestProtoHistoryLoggingService {
evt = reader.readEvent();
Assert.assertNotNull(evt);
Assert.assertEquals(HistoryEventType.DAG_FINISHED.name(), evt.getEventType());
+ reader.close();
// Verify manifest file scanner.
DagManifesFileScanner scanner = new DagManifesFileScanner(manifestLogger);
@@ -141,6 +135,91 @@ public class TestProtoHistoryLoggingService {
scanner.close();
}
+ @Test
+ public void testServiceSplitEvents() throws Exception {
+ ProtoHistoryLoggingService service = createService(true);
+ service.start();
+ TezDAGID dagId = TezDAGID.getInstance(appId, 0);
+ List<HistoryEventProto> protos = new ArrayList<>();
+ for (DAGHistoryEvent event : makeHistoryEvents(dagId, service)) {
+ protos.add(new HistoryEventProtoConverter().convert(event.getHistoryEvent()));
+ service.handle(event);
+ }
+ service.stop();
+
+ TezProtoLoggers loggers = new TezProtoLoggers();
+ Assert.assertTrue(loggers.setup(service.getConfig(), clock));
+
+ // Verify dag events are logged.
+ DatePartitionedLogger<HistoryEventProto> dagLogger = loggers.getDagEventsLogger();
+ Path dagFilePath1 = dagLogger.getPathForDate(LocalDate.ofEpochDay(0), dagId + "_1");
+ Path dagFilePath2 = dagLogger.getPathForDate(LocalDate.ofEpochDay(0), dagId + "_1" +
+ ProtoHistoryLoggingService.SPLIT_DAG_EVENTS_FILE_SUFFIX);
+
+ try (ProtoMessageReader<HistoryEventProto> reader = dagLogger.getReader(dagFilePath1)) {
+ assertEventsRead(reader, protos, 1, 1 + 3);
+ }
+
+ try (ProtoMessageReader<HistoryEventProto> reader = dagLogger.getReader(dagFilePath2)) {
+ assertEventsRead(reader, protos, 4, protos.size());
+ }
+
+ // Verify app events are logged.
+ DatePartitionedLogger<HistoryEventProto> appLogger = loggers.getAppEventsLogger();
+ Path appFilePath = appLogger.getPathForDate(LocalDate.ofEpochDay(0), attemptId.toString());
+ ProtoMessageReader<HistoryEventProto> appReader = appLogger.getReader(appFilePath);
+ long appOffset = appReader.getOffset();
+ Assert.assertEquals(protos.get(0), appReader.readEvent());
+ appReader.close();
+
+ // Verify manifest events are logged.
+ DatePartitionedLogger<ManifestEntryProto> manifestLogger = loggers.getManifestEventsLogger();
+ DagManifesFileScanner scanner = new DagManifesFileScanner(manifestLogger);
+ Path manifestFilePath = manifestLogger.getPathForDate(
+ LocalDate.ofEpochDay(0), attemptId.toString());
+ ProtoMessageReader<ManifestEntryProto> manifestReader = manifestLogger.getReader(
+ manifestFilePath);
+ ManifestEntryProto manifest = manifestReader.readEvent();
+ Assert.assertEquals(manifest, scanner.getNext());
+ Assert.assertEquals(appId.toString(), manifest.getAppId());
+ Assert.assertEquals(dagId.toString(), manifest.getDagId());
+ Assert.assertEquals(dagFilePath1.toString(), manifest.getDagFilePath());
+ Assert.assertEquals(appFilePath.toString(), manifest.getAppFilePath());
+ Assert.assertEquals(appOffset, manifest.getAppLaunchedEventOffset());
+ Assert.assertEquals(-1, manifest.getDagFinishedEventOffset());
+
+ HistoryEventProto evt = null;
+ // Verify offsets in manifest logger.
+ try (ProtoMessageReader<HistoryEventProto> reader = dagLogger.getReader(
+ new Path(manifest.getDagFilePath()))) {
+ reader.setOffset(manifest.getDagSubmittedEventOffset());
+ evt = reader.readEvent();
+ Assert.assertNotNull(evt);
+ Assert.assertEquals(HistoryEventType.DAG_SUBMITTED.name(), evt.getEventType());
+ }
+
+ manifest = manifestReader.readEvent();
+ Assert.assertEquals(manifest, scanner.getNext());
+ Assert.assertEquals(appId.toString(), manifest.getAppId());
+ Assert.assertEquals(dagId.toString(), manifest.getDagId());
+ Assert.assertEquals(dagFilePath2.toString(), manifest.getDagFilePath());
+ Assert.assertEquals(appFilePath.toString(), manifest.getAppFilePath());
+ Assert.assertEquals(appOffset, manifest.getAppLaunchedEventOffset());
+ Assert.assertEquals(-1, manifest.getDagSubmittedEventOffset());
+
+ try (ProtoMessageReader<HistoryEventProto> reader = dagLogger.getReader(
+ new Path(manifest.getDagFilePath()))) {
+ reader.setOffset(manifest.getDagFinishedEventOffset());
+ evt = reader.readEvent();
+ Assert.assertNotNull(evt);
+ Assert.assertEquals(HistoryEventType.DAG_FINISHED.name(), evt.getEventType());
+ }
+
+ // Verify manifest file scanner.
+ Assert.assertNull(scanner.getNext());
+ scanner.close();
+ }
+
private List<DAGHistoryEvent> makeHistoryEvents(TezDAGID dagId,
ProtoHistoryLoggingService service) {
List<DAGHistoryEvent> historyEvents = new ArrayList<>();
@@ -152,6 +231,11 @@ public class TestProtoHistoryLoggingService {
new VersionInfo("component", "1.1.0", "rev1", "20120101", "git.apache.org") {})));
historyEvents.add(new DAGHistoryEvent(dagId, new DAGSubmittedEvent(dagId, time,
DAGPlan.getDefaultInstance(), attemptId, null, user, conf, null, "default")));
+ historyEvents.add(new DAGHistoryEvent(dagId, new DAGInitializedEvent(dagId, time + 1, user,
+ "test_dag", Collections.emptyMap())));
+ historyEvents.add(new DAGHistoryEvent(dagId, new DAGStartedEvent(dagId, time + 2, user,
+ "test_dag")));
+
TezVertexID vertexID = TezVertexID.getInstance(dagId, 1);
historyEvents.add(new DAGHistoryEvent(dagId, new VertexStartedEvent(vertexID, time, time)));
TezTaskID tezTaskID = TezTaskID.getInstance(vertexID, 1);
@@ -181,7 +265,7 @@ public class TestProtoHistoryLoggingService {
}
}
- private ProtoHistoryLoggingService createService() throws IOException {
+ private ProtoHistoryLoggingService createService(boolean splitEvents) throws IOException {
ProtoHistoryLoggingService service = new ProtoHistoryLoggingService();
clock = new FixedClock(0); // Start time is always first day, easier to write tests.
AppContext appContext = mock(AppContext.class);
@@ -194,7 +278,26 @@ public class TestProtoHistoryLoggingService {
Configuration conf = new Configuration(false);
String basePath = tempFolder.newFolder().getAbsolutePath();
conf.set(TezConfiguration.TEZ_HISTORY_LOGGING_PROTO_BASE_DIR, basePath);
+ conf.setBoolean(TezConfiguration.TEZ_HISTORY_LOGGING_PROTO_SPLIT_DAG_START, splitEvents);
service.init(conf);
return service;
}
+
+ private void assertEventsRead(ProtoMessageReader<HistoryEventProto> reader,
+ List<HistoryEventProto> protos, int start, int finish) throws Exception {
+ for (int i = start; i < finish; ++i) {
+ try {
+ HistoryEventProto evt = reader.readEvent();
+ Assert.assertEquals(protos.get(i), evt);
+ } catch (EOFException e) {
+ Assert.fail("Unexpected eof");
+ }
+ }
+ try {
+ HistoryEventProto evt = reader.readEvent();
+ Assert.assertNull(evt);
+ } catch (EOFException e) {
+ // Expected.
+ }
+ }
}