You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by go...@apache.org on 2019/01/28 03:28:35 UTC

[tez] branch master updated: TEZ-4028: Events not visible from proto history logging for s3a filesystem until dag completes (Harish JP, via Gopal V)

This is an automated email from the ASF dual-hosted git repository.

gopalv pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git


The following commit(s) were added to refs/heads/master by this push:
     new c78d348  TEZ-4028: Events not visible from proto history logging for s3a filesystem until dag completes (Harish JP, via Gopal V)
c78d348 is described below

commit c78d3487f5170321f985c058b8804d25b6eae48c
Author: Harish JP <ha...@apache.org>
AuthorDate: Sun Jan 27 19:27:46 2019 -0800

    TEZ-4028: Events not visible from proto history logging for s3a filesystem until dag completes (Harish JP, via Gopal V)
    
    Signed-off-by: Gopal V <go...@apache.org>
---
 .../org/apache/tez/dag/api/TezConfiguration.java   |  23 +++-
 .../logging/proto/ProtoHistoryLoggingService.java  |  14 ++-
 .../proto/TestProtoHistoryLoggingService.java      | 137 ++++++++++++++++++---
 3 files changed, 150 insertions(+), 24 deletions(-)

diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 791e634..7c52122 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -26,17 +26,17 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.tez.common.annotation.ConfigurationClass;
-import org.apache.tez.common.annotation.ConfigurationProperty;
-import org.apache.tez.dag.api.EdgeProperty.ConcurrentEdgeTriggerType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.common.annotation.ConfigurationClass;
+import org.apache.tez.common.annotation.ConfigurationProperty;
+import org.apache.tez.dag.api.EdgeProperty.ConcurrentEdgeTriggerType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -56,7 +56,7 @@ public class TezConfiguration extends Configuration {
 
   private final static Logger LOG = LoggerFactory.getLogger(TezConfiguration.class);
 
-  private static Map<String, Scope> PropertyScope = new HashMap<String, Scope>();
+  private static Map<String, Scope> PropertyScope = new HashMap<>();
 
   static {
     Configuration.addDeprecation("tez.am.counters.max.keys", TezConfiguration.TEZ_COUNTERS_MAX);
@@ -1488,6 +1488,17 @@ public class TezConfiguration extends Configuration {
   public static final long TEZ_HISTORY_LOGGING_PROTO_SYNC_WINDOWN_SECS_DEFAULT = 60L;
 
   /**
+   * Boolean value. Set this to true, if the underlying file system does not support flush (Ex: s3).
+   * The dag submitted, initialized and started events are written into a file and closed. The rest
+   * of the events are written into another file.
+   */
+  @ConfigurationScope(Scope.AM)
+  @ConfigurationProperty(type="boolean")
+  public static final String TEZ_HISTORY_LOGGING_PROTO_SPLIT_DAG_START =
+      TEZ_PREFIX + "history.logging.split-dag-start";
+  public static final boolean TEZ_HISTORY_LOGGING_PROTO_SPLIT_DAG_START_DEFAULT = false;
+
+  /**
    * Long value. The amount of time in seconds to wait to ensure all events for a day is synced
    * to disk. This should be maximum time variation b/w machines + maximum time to sync file
    * content and metadata.
diff --git a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoHistoryLoggingService.java b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoHistoryLoggingService.java
index 206b1c1..d2e0b4d 100644
--- a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoHistoryLoggingService.java
+++ b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoHistoryLoggingService.java
@@ -45,6 +45,9 @@ import org.slf4j.LoggerFactory;
  */
 public class ProtoHistoryLoggingService extends HistoryLoggingService {
   private static final Logger LOG = LoggerFactory.getLogger(ProtoHistoryLoggingService.class);
+  // The file suffix used if we are writing start events and rest into different files.
+  static final String SPLIT_DAG_EVENTS_FILE_SUFFIX = "_1";
+
   private final HistoryEventProtoConverter converter =
       new HistoryEventProtoConverter();
   private boolean loggingDisabled = false;
@@ -64,6 +67,7 @@ public class ProtoHistoryLoggingService extends HistoryLoggingService {
 
   private String appEventsFile;
   private long appLaunchedEventOffset;
+  private boolean splitDagStartEvents;
 
   public ProtoHistoryLoggingService() {
     super(ProtoHistoryLoggingService.class.getName());
@@ -75,6 +79,8 @@ public class ProtoHistoryLoggingService extends HistoryLoggingService {
     setConfig(conf);
     loggingDisabled = !conf.getBoolean(TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED,
         TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED_DEFAULT);
+    splitDagStartEvents = conf.getBoolean(TezConfiguration.TEZ_HISTORY_LOGGING_PROTO_SPLIT_DAG_START,
+        TezConfiguration.TEZ_HISTORY_LOGGING_PROTO_SPLIT_DAG_START_DEFAULT);
     LOG.info("Inited ProtoHistoryLoggingService");
   }
 
@@ -171,6 +177,13 @@ public class ProtoHistoryLoggingService extends HistoryLoggingService {
         dagEventsWriter.writeProto(converter.convert(historyEvent));
       } else if (dagEventsWriter != null) {
         dagEventsWriter.writeProto(converter.convert(historyEvent));
+        if (splitDagStartEvents && type == HistoryEventType.DAG_STARTED) {
+          // Close the file and write submitted event offset into manifest.
+          finishCurrentDag(null);
+          dagEventsWriter = loggers.getDagEventsLogger().getWriter(dagId.toString()
+              + "_" + appContext.getApplicationAttemptId().getAttemptId()
+              + SPLIT_DAG_EVENTS_FILE_SUFFIX);
+        }
       }
     }
   }
@@ -214,7 +227,6 @@ public class ProtoHistoryLoggingService extends HistoryLoggingService {
       // into another dag.
       IOUtils.closeQuietly(dagEventsWriter);
       dagEventsWriter = null;
-      currentDagId = null;
       dagSubmittedEventOffset = -1;
     }
   }
diff --git a/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestProtoHistoryLoggingService.java b/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestProtoHistoryLoggingService.java
index bc79b07..143faed 100644
--- a/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestProtoHistoryLoggingService.java
+++ b/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestProtoHistoryLoggingService.java
@@ -25,6 +25,7 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.time.LocalDate;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 
@@ -45,6 +46,8 @@ import org.apache.tez.dag.history.DAGHistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.history.events.AppLaunchedEvent;
 import org.apache.tez.dag.history.events.DAGFinishedEvent;
+import org.apache.tez.dag.history.events.DAGInitializedEvent;
+import org.apache.tez.dag.history.events.DAGStartedEvent;
 import org.apache.tez.dag.history.events.DAGSubmittedEvent;
 import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
 import org.apache.tez.dag.history.events.TaskStartedEvent;
@@ -72,7 +75,7 @@ public class TestProtoHistoryLoggingService {
 
   @Test
   public void testService() throws Exception {
-    ProtoHistoryLoggingService service = createService();
+    ProtoHistoryLoggingService service = createService(false);
     service.start();
     TezDAGID dagId = TezDAGID.getInstance(appId, 0);
     List<HistoryEventProto> protos = new ArrayList<>();
@@ -87,19 +90,9 @@ public class TestProtoHistoryLoggingService {
 
     // Verify dag events are logged.
     DatePartitionedLogger<HistoryEventProto> dagLogger = loggers.getDagEventsLogger();
-    Path dagFilePath = dagLogger.getPathForDate(LocalDate.ofEpochDay(0), dagId.toString() + "_" + 1);
+    Path dagFilePath = dagLogger.getPathForDate(LocalDate.ofEpochDay(0), dagId + "_1");
     ProtoMessageReader<HistoryEventProto> reader = dagLogger.getReader(dagFilePath);
-    HistoryEventProto evt = reader.readEvent();
-    int ind = 1;
-    while (evt != null) {
-      Assert.assertEquals(protos.get(ind), evt);
-      ind++;
-      try {
-        evt = reader.readEvent();
-      } catch (EOFException e) {
-        evt = null;
-      }
-    }
+    assertEventsRead(reader, protos, 1, protos.size());
     reader.close();
 
     // Verify app events are logged.
@@ -108,7 +101,7 @@ public class TestProtoHistoryLoggingService {
     ProtoMessageReader<HistoryEventProto> appReader = appLogger.getReader(appFilePath);
     long appOffset = appReader.getOffset();
     Assert.assertEquals(protos.get(0), appReader.readEvent());
-    reader.close();
+    appReader.close();
 
     // Verify manifest events are logged.
     DatePartitionedLogger<ManifestEntryProto> manifestLogger = loggers.getManifestEventsLogger();
@@ -125,7 +118,7 @@ public class TestProtoHistoryLoggingService {
     // Verify offsets in manifest logger.
     reader = dagLogger.getReader(new Path(manifest.getDagFilePath()));
     reader.setOffset(manifest.getDagSubmittedEventOffset());
-    evt = reader.readEvent();
+    HistoryEventProto evt = reader.readEvent();
     Assert.assertNotNull(evt);
     Assert.assertEquals(HistoryEventType.DAG_SUBMITTED.name(), evt.getEventType());
 
@@ -133,6 +126,7 @@ public class TestProtoHistoryLoggingService {
     evt = reader.readEvent();
     Assert.assertNotNull(evt);
     Assert.assertEquals(HistoryEventType.DAG_FINISHED.name(), evt.getEventType());
+    reader.close();
 
     // Verify manifest file scanner.
     DagManifesFileScanner scanner = new DagManifesFileScanner(manifestLogger);
@@ -141,6 +135,91 @@ public class TestProtoHistoryLoggingService {
     scanner.close();
   }
 
+  @Test
+  public void testServiceSplitEvents() throws Exception {
+    ProtoHistoryLoggingService service = createService(true);
+    service.start();
+    TezDAGID dagId = TezDAGID.getInstance(appId, 0);
+    List<HistoryEventProto> protos = new ArrayList<>();
+    for (DAGHistoryEvent event : makeHistoryEvents(dagId, service)) {
+      protos.add(new HistoryEventProtoConverter().convert(event.getHistoryEvent()));
+      service.handle(event);
+    }
+    service.stop();
+
+    TezProtoLoggers loggers = new TezProtoLoggers();
+    Assert.assertTrue(loggers.setup(service.getConfig(), clock));
+
+    // Verify dag events are logged.
+    DatePartitionedLogger<HistoryEventProto> dagLogger = loggers.getDagEventsLogger();
+    Path dagFilePath1 = dagLogger.getPathForDate(LocalDate.ofEpochDay(0), dagId + "_1");
+    Path dagFilePath2 = dagLogger.getPathForDate(LocalDate.ofEpochDay(0), dagId + "_1" +
+        ProtoHistoryLoggingService.SPLIT_DAG_EVENTS_FILE_SUFFIX);
+
+    try (ProtoMessageReader<HistoryEventProto> reader = dagLogger.getReader(dagFilePath1)) {
+      assertEventsRead(reader, protos, 1, 1 + 3);
+    }
+
+    try (ProtoMessageReader<HistoryEventProto> reader = dagLogger.getReader(dagFilePath2)) {
+      assertEventsRead(reader, protos, 4, protos.size());
+    }
+
+    // Verify app events are logged.
+    DatePartitionedLogger<HistoryEventProto> appLogger = loggers.getAppEventsLogger();
+    Path appFilePath = appLogger.getPathForDate(LocalDate.ofEpochDay(0), attemptId.toString());
+    ProtoMessageReader<HistoryEventProto> appReader = appLogger.getReader(appFilePath);
+    long appOffset = appReader.getOffset();
+    Assert.assertEquals(protos.get(0), appReader.readEvent());
+    appReader.close();
+
+    // Verify manifest events are logged.
+    DatePartitionedLogger<ManifestEntryProto> manifestLogger = loggers.getManifestEventsLogger();
+    DagManifesFileScanner scanner = new DagManifesFileScanner(manifestLogger);
+    Path manifestFilePath = manifestLogger.getPathForDate(
+        LocalDate.ofEpochDay(0), attemptId.toString());
+    ProtoMessageReader<ManifestEntryProto> manifestReader = manifestLogger.getReader(
+        manifestFilePath);
+    ManifestEntryProto manifest = manifestReader.readEvent();
+    Assert.assertEquals(manifest, scanner.getNext());
+    Assert.assertEquals(appId.toString(), manifest.getAppId());
+    Assert.assertEquals(dagId.toString(), manifest.getDagId());
+    Assert.assertEquals(dagFilePath1.toString(), manifest.getDagFilePath());
+    Assert.assertEquals(appFilePath.toString(), manifest.getAppFilePath());
+    Assert.assertEquals(appOffset, manifest.getAppLaunchedEventOffset());
+    Assert.assertEquals(-1, manifest.getDagFinishedEventOffset());
+
+    HistoryEventProto evt = null;
+    // Verify offsets in manifest logger.
+    try (ProtoMessageReader<HistoryEventProto> reader = dagLogger.getReader(
+        new Path(manifest.getDagFilePath()))) {
+      reader.setOffset(manifest.getDagSubmittedEventOffset());
+      evt = reader.readEvent();
+      Assert.assertNotNull(evt);
+      Assert.assertEquals(HistoryEventType.DAG_SUBMITTED.name(), evt.getEventType());
+    }
+
+    manifest = manifestReader.readEvent();
+    Assert.assertEquals(manifest, scanner.getNext());
+    Assert.assertEquals(appId.toString(), manifest.getAppId());
+    Assert.assertEquals(dagId.toString(), manifest.getDagId());
+    Assert.assertEquals(dagFilePath2.toString(), manifest.getDagFilePath());
+    Assert.assertEquals(appFilePath.toString(), manifest.getAppFilePath());
+    Assert.assertEquals(appOffset, manifest.getAppLaunchedEventOffset());
+    Assert.assertEquals(-1, manifest.getDagSubmittedEventOffset());
+
+    try (ProtoMessageReader<HistoryEventProto> reader = dagLogger.getReader(
+        new Path(manifest.getDagFilePath()))) {
+      reader.setOffset(manifest.getDagFinishedEventOffset());
+      evt = reader.readEvent();
+      Assert.assertNotNull(evt);
+      Assert.assertEquals(HistoryEventType.DAG_FINISHED.name(), evt.getEventType());
+    }
+
+    // Verify manifest file scanner.
+    Assert.assertNull(scanner.getNext());
+    scanner.close();
+  }
+
   private List<DAGHistoryEvent> makeHistoryEvents(TezDAGID dagId,
       ProtoHistoryLoggingService service) {
     List<DAGHistoryEvent> historyEvents = new ArrayList<>();
@@ -152,6 +231,11 @@ public class TestProtoHistoryLoggingService {
         new VersionInfo("component", "1.1.0", "rev1", "20120101", "git.apache.org") {})));
     historyEvents.add(new DAGHistoryEvent(dagId, new DAGSubmittedEvent(dagId, time,
         DAGPlan.getDefaultInstance(), attemptId, null, user, conf, null, "default")));
+    historyEvents.add(new DAGHistoryEvent(dagId, new DAGInitializedEvent(dagId, time + 1, user,
+        "test_dag", Collections.emptyMap())));
+    historyEvents.add(new DAGHistoryEvent(dagId, new DAGStartedEvent(dagId, time + 2, user,
+        "test_dag")));
+
     TezVertexID vertexID = TezVertexID.getInstance(dagId, 1);
     historyEvents.add(new DAGHistoryEvent(dagId, new VertexStartedEvent(vertexID, time, time)));
     TezTaskID tezTaskID = TezTaskID.getInstance(vertexID, 1);
@@ -168,7 +252,7 @@ public class TestProtoHistoryLoggingService {
   }
 
   private static class FixedClock implements Clock {
-    final Clock clock = new SystemClock();
+    final Clock clock = SystemClock.getInstance();
     final long diff;
 
     public FixedClock(long startTime) {
@@ -181,7 +265,7 @@ public class TestProtoHistoryLoggingService {
     }
   }
 
-  private ProtoHistoryLoggingService createService() throws IOException {
+  private ProtoHistoryLoggingService createService(boolean splitEvents) throws IOException {
     ProtoHistoryLoggingService service = new ProtoHistoryLoggingService();
     clock = new FixedClock(0); // Start time is always first day, easier to write tests.
     AppContext appContext = mock(AppContext.class);
@@ -194,7 +278,26 @@ public class TestProtoHistoryLoggingService {
     Configuration conf = new Configuration(false);
     String basePath = tempFolder.newFolder().getAbsolutePath();
     conf.set(TezConfiguration.TEZ_HISTORY_LOGGING_PROTO_BASE_DIR, basePath);
+    conf.setBoolean(TezConfiguration.TEZ_HISTORY_LOGGING_PROTO_SPLIT_DAG_START, splitEvents);
     service.init(conf);
     return service;
   }
+
+  private void assertEventsRead(ProtoMessageReader<HistoryEventProto> reader,
+      List<HistoryEventProto> protos, int start, int finish) throws Exception {
+    for (int i = start; i < finish; ++i) {
+      try {
+        HistoryEventProto evt = reader.readEvent();
+        Assert.assertEquals(protos.get(i), evt);
+      } catch (EOFException e) {
+        Assert.fail("Unexpected eof");
+      }
+    }
+    try {
+      HistoryEventProto evt = reader.readEvent();
+      Assert.assertNull(evt);
+    } catch (EOFException e) {
+      // Expected.
+    }
+  }
 }