You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zh...@apache.org on 2017/03/28 19:47:43 UTC

[02/50] [abbrv] tez git commit: TEZ-3581. Add different logger to enable suppressing logs for specific lines. Contributed by Harish Jaiprakash.

TEZ-3581. Add different logger to enable suppressing logs for specific
lines. Contributed by Harish Jaiprakash.


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d415197c
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d415197c
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d415197c

Branch: refs/heads/TEZ-1190
Commit: d415197c67562ee2859b240e8a7e316067c4ed6b
Parents: c0270cb
Author: Siddharth Seth <ss...@apache.org>
Authored: Sun Feb 5 19:08:08 2017 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Sun Feb 5 19:08:08 2017 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../tez/dag/history/HistoryEventHandler.java    | 23 +++--
 .../resources/tez-container-log4j.properties    |  8 ++
 .../org/apache/tez/http/HttpConnection.java     | 18 +++-
 .../library/common/shuffle/ShuffleUtils.java    | 92 ++++++++++++++------
 .../common/shuffle/impl/ShuffleManager.java     |  7 +-
 .../orderedgrouped/ShuffleScheduler.java        |  8 +-
 .../common/shuffle/TestShuffleUtils.java        | 34 ++++++--
 8 files changed, 144 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/d415197c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 37438d9..c4a1d72 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3581. Add different logger to enable suppressing logs for specific lines.
   TEZ-3601. Add another HistoryLogLevel to suppress TaskAttempts at specific levels
   TEZ-3600. Fix flaky test: TestTokenCache
   TEZ-3589. add a unit test for amKeepAlive not being shutdown if an app takes a long time to launch.

http://git-wip-us.apache.org/repos/asf/tez/blob/d415197c/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
index 79d1fc3..4fa1926 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,6 +45,8 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
 public class HistoryEventHandler extends CompositeService {
 
   private static Logger LOG = LoggerFactory.getLogger(HistoryEventHandler.class);
+  private static Logger LOG_CRITICAL_EVENTS =
+      LoggerFactory.getLogger(LOG.getName() + ".criticalEvents");
 
   private final AppContext context;
   private RecoveryService recoveryService;
@@ -59,6 +62,8 @@ public class HistoryEventHandler extends CompositeService {
   private final ConcurrentHashMap<TezTaskAttemptID, DAGHistoryEvent> suppressedEvents =
       new ConcurrentHashMap<>();
 
+  private final AtomicLong criticalEventCount = new AtomicLong();
+
   public HistoryEventHandler(AppContext context) {
     super(HistoryEventHandler.class.getName());
     this.context = context;
@@ -141,12 +146,18 @@ public class HistoryEventHandler extends CompositeService {
       historyLoggingService.handle(event);
     }
 
-    // TODO at some point we should look at removing this once
-    // there is a UI in place
-    LOG.info("[HISTORY]"
-        + "[DAG:" + dagIdStr + "]"
-        + "[Event:" + event.getHistoryEvent().getEventType().name() + "]"
-        + ": " + event.getHistoryEvent().toString());
+    if (LOG_CRITICAL_EVENTS.isInfoEnabled()) {
+      // TODO at some point we should look at removing this once
+      // there is a UI in place
+      LOG_CRITICAL_EVENTS.info("[HISTORY]"
+          + "[DAG:" + dagIdStr + "]"
+          + "[Event:" + event.getHistoryEvent().getEventType().name() + "]"
+          + ": " + event.getHistoryEvent().toString());
+    } else {
+      if (criticalEventCount.incrementAndGet() % 1000 == 0) {
+        LOG.info("Got {} critical events", criticalEventCount);
+      }
+    }
   }
 
   private boolean shouldLogEvent(DAGHistoryEvent event) {

http://git-wip-us.apache.org/repos/asf/tez/blob/d415197c/tez-dag/src/main/resources/tez-container-log4j.properties
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/resources/tez-container-log4j.properties b/tez-dag/src/main/resources/tez-container-log4j.properties
index 4620a78..721cd67 100644
--- a/tez-dag/src/main/resources/tez-container-log4j.properties
+++ b/tez-dag/src/main/resources/tez-container-log4j.properties
@@ -36,3 +36,11 @@ log4j.appender.CLA.layout.ConversionPattern=%d{ISO8601} [%p] [%t] |%c{2}|: %m%n
 #
 log4j.appender.EventCounter=org.apache.hadoop.log.metrics.EventCounter
 
+# Disable loggers which log a lot, use this if you want to reduce the log sizes. This will affect
+# the analyzer since it relies on these log lines.
+# log4j.logger.org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager.fetch=WARN
+# log4j.logger.org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleScheduler=WARN
+# log4j.logger.org.apache.tez.http.HttpConnection.url=WARN
+
+# This should be part of the AM log4j.properties file, it will not work from this file.
+# log4j.logger.org.apache.tez.dag.history.HistoryEventHandler.criticalEvents=WARN

http://git-wip-us.apache.org/repos/asf/tez/blob/d415197c/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnection.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnection.java b/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnection.java
index d781e64..9bfe4e7 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnection.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnection.java
@@ -36,10 +36,12 @@ import java.io.InputStream;
 import java.net.HttpURLConnection;
 import java.net.URL;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class HttpConnection extends BaseHttpConnection {
 
   private static final Logger LOG = LoggerFactory.getLogger(HttpConnection.class);
+  private static final Logger URL_LOG = LoggerFactory.getLogger(LOG.getName() + ".url");
 
   private URL url;
   private final String logIdentifier;
@@ -56,6 +58,7 @@ public class HttpConnection extends BaseHttpConnection {
 
   private final HttpConnectionParams httpConnParams;
   private final StopWatch stopWatch;
+  private final AtomicLong urlLogCount;
 
   /**
    * HttpConnection
@@ -73,6 +76,7 @@ public class HttpConnection extends BaseHttpConnection {
     this.httpConnParams = connParams;
     this.url = url;
     this.stopWatch = new StopWatch();
+    this.urlLogCount = new AtomicLong();
     if (LOG.isDebugEnabled()) {
       LOG.debug("MapOutput URL :" + url.toString());
     }
@@ -229,9 +233,17 @@ public class HttpConnection extends BaseHttpConnection {
 
     // verify that replyHash is HMac of encHash
     SecureShuffleUtils.verifyReply(replyHash, encHash, jobTokenSecretMgr);
-    //Following log statement will be used by tez-tool perf-analyzer for mapping attempt to NM host
-    LOG.info("for url=" + url +
-        " sent hash and receievd reply " + stopWatch.now(TimeUnit.MILLISECONDS) + " ms");
+    if (URL_LOG.isInfoEnabled()) {
+      // Following log statement will be used by tez-tool perf-analyzer for mapping attempt to NM
+      // host
+      URL_LOG.info("for url=" + url + " sent hash and receievd reply " +
+          stopWatch.now(TimeUnit.MILLISECONDS) + " ms");
+    } else {
+      // Log summary.
+      if (urlLogCount.incrementAndGet() % 1000 == 0) {
+        LOG.info("Sent hash and recieved reply for {} urls", urlLogCount);
+      }
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/tez/blob/d415197c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index aa07233..82e844d 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -29,6 +29,7 @@ import java.text.DecimalFormat;
 import java.util.BitSet;
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.zip.Deflater;
 
 import javax.annotation.Nullable;
@@ -516,35 +517,70 @@ public class ShuffleUtils {
     return builder.build();
   }
 
-  /**
-   * Log individual fetch complete event.
-   * This log information would be used by tez-tool/perf-analzyer/shuffle tools for mining
-   * - amount of data transferred between source to destination machine
-   * - time taken to transfer data between source to destination machine
-   * - details on DISK/DISK_DIRECT/MEMORY based shuffles
-   *
-   * @param log
-   * @param millis
-   * @param bytesCompressed
-   * @param bytesDecompressed
-   * @param outputType
-   * @param srcAttemptIdentifier
-   */
-  public static void logIndividualFetchComplete(Logger log, long millis, long
-      bytesCompressed,
-      long bytesDecompressed, String outputType, InputAttemptIdentifier srcAttemptIdentifier) {
-    double rate = 0;
-    if (millis != 0) {
-      rate = bytesCompressed / ((double) millis / 1000);
-      rate = rate / (1024 * 1024);
+  public static class FetchStatsLogger {
+    private final Logger activeLogger;
+    private final Logger aggregateLogger;
+    private final AtomicLong logCount = new AtomicLong();
+    private final AtomicLong compressedSize = new AtomicLong();
+    private final AtomicLong decompressedSize = new AtomicLong();
+    private final AtomicLong totalTime = new AtomicLong();
+
+    public FetchStatsLogger(Logger activeLogger, Logger aggregateLogger) {
+      this.activeLogger = activeLogger;
+      this.aggregateLogger = aggregateLogger;
+    }
+
+    /**
+     * Log individual fetch complete event.
+     * This log information would be used by tez-tool/perf-analzyer/shuffle tools for mining
+     * - amount of data transferred between source to destination machine
+     * - time taken to transfer data between source to destination machine
+     * - details on DISK/DISK_DIRECT/MEMORY based shuffles
+     *
+     * @param millis
+     * @param bytesCompressed
+     * @param bytesDecompressed
+     * @param outputType
+     * @param srcAttemptIdentifier
+     */
+    public void logIndividualFetchComplete(long millis, long bytesCompressed,
+        long bytesDecompressed, String outputType, InputAttemptIdentifier srcAttemptIdentifier) {
+      double rate = 0;
+      if (millis != 0) {
+        rate = bytesCompressed / ((double) millis / 1000);
+        rate = rate / (1024 * 1024);
+      }
+      if (activeLogger.isInfoEnabled()) {
+        activeLogger.info(
+            "Completed fetch for attempt: "
+                + toShortString(srcAttemptIdentifier)
+                +" to " + outputType +
+                ", csize=" + bytesCompressed + ", dsize=" + bytesDecompressed +
+                ", EndTime=" + System.currentTimeMillis() + ", TimeTaken=" + millis + ", Rate=" +
+                MBPS_FORMAT.get().format(rate) + " MB/s");
+      } else {
+        long currentCount, currentCompressedSize, currentDecompressedSize, currentTotalTime;
+        synchronized (this) {
+          currentCount = logCount.incrementAndGet();
+          currentCompressedSize = compressedSize.addAndGet(bytesCompressed);
+          currentDecompressedSize = decompressedSize.addAndGet(bytesDecompressed);
+          currentTotalTime = totalTime.addAndGet(millis);
+          if (currentCount % 1000 == 0) {
+            compressedSize.set(0);
+            decompressedSize.set(0);
+            totalTime.set(0);
+          }
+        }
+        if (currentCount % 1000 == 0) {
+          double avgRate = currentTotalTime == 0 ? 0
+              : currentCompressedSize / (double)currentTotalTime / 1000 / 1024 / 1024;
+          aggregateLogger.info("Completed {} fetches, stats for last 1000 fetches: "
+              + "avg csize: {}, avg dsize: {}, avgTime: {}, avgRate: {}", currentCount,
+              currentCompressedSize / 1000, currentDecompressedSize / 1000, currentTotalTime / 1000,
+              MBPS_FORMAT.get().format(avgRate));
+        }
+      }
     }
-    log.info(
-        "Completed fetch for attempt: "
-            + toShortString(srcAttemptIdentifier)
-            +" to " + outputType +
-            ", csize=" + bytesCompressed + ", dsize=" + bytesDecompressed +
-            ", EndTime=" + System.currentTimeMillis() + ", TimeTaken=" + millis + ", Rate=" +
-            MBPS_FORMAT.get().format(rate) + " MB/s");
   }
 
   private static String toShortString(InputAttemptIdentifier inputAttemptIdentifier) {

http://git-wip-us.apache.org/repos/asf/tez/blob/d415197c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index d034b2e..b2ff51d 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -80,6 +80,7 @@ import org.apache.tez.runtime.library.common.shuffle.HostPort;
 import org.apache.tez.runtime.library.common.shuffle.InputHost;
 import org.apache.tez.runtime.library.common.shuffle.InputHost.PartitionToInputs;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils.FetchStatsLogger;
 
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
@@ -98,6 +99,8 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 public class ShuffleManager implements FetcherCallback {
 
   private static final Logger LOG = LoggerFactory.getLogger(ShuffleManager.class);
+  private static final Logger LOG_FETCH = LoggerFactory.getLogger(LOG.getName() + ".fetch");
+  private static final FetchStatsLogger fetchStatsLogger = new FetchStatsLogger(LOG_FETCH, LOG);
 
   private final InputContext inputContext;
   private final int numInputs;
@@ -628,8 +631,8 @@ public class ShuffleManager implements FetcherCallback {
         if (!completedInputSet.contains(inputIdentifier)) {
           fetchedInput.commit();
           committed = true;
-          ShuffleUtils.logIndividualFetchComplete(LOG, copyDuration,
-              fetchedBytes, decompressedLength, fetchedInput.getType().toString(), srcAttemptIdentifier);
+          fetchStatsLogger.logIndividualFetchComplete(copyDuration, fetchedBytes,
+              decompressedLength, fetchedInput.getType().toString(), srcAttemptIdentifier);
 
           // Processing counters for completed and commit fetches only. Need
           // additional counters for excessive fetches - which primarily comes

http://git-wip-us.apache.org/repos/asf/tez/blob/d415197c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index 3d2c1ad..cce486c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -73,6 +73,7 @@ import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 import org.apache.tez.runtime.library.common.TezRuntimeUtils;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils.FetchStatsLogger;
 import org.apache.tez.runtime.library.common.shuffle.HostPort;
 import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapHost.HostPortPartition;
 import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapOutput.Type;
@@ -140,6 +141,9 @@ class ShuffleScheduler {
   private final AtomicLong shuffleStart = new AtomicLong(0);
 
   private static final Logger LOG = LoggerFactory.getLogger(ShuffleScheduler.class);
+  private static final Logger LOG_FETCH = LoggerFactory.getLogger(LOG.getName() + ".fetch");
+  private static final FetchStatsLogger fetchStatsLogger = new FetchStatsLogger(LOG_FETCH, LOG);
+
   static final long INITIAL_PENALTY = 2000L; // 2 seconds
   private static final float PENALTY_GROWTH_RATE = 1.3f;
 
@@ -576,8 +580,8 @@ class ShuffleScheduler {
         }
 
         output.commit();
-        ShuffleUtils.logIndividualFetchComplete(LOG, millis, bytesCompressed,
-            bytesDecompressed, output.getType().toString(), srcAttemptIdentifier);
+        fetchStatsLogger.logIndividualFetchComplete(millis, bytesCompressed, bytesDecompressed,
+            output.getType().toString(), srcAttemptIdentifier);
         if (output.getType() == Type.DISK) {
           bytesShuffledToDisk.increment(bytesCompressed);
         } else if (output.getType() == Type.DISK_DIRECT) {

http://git-wip-us.apache.org/repos/asf/tez/blob/d415197c/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
index 496468b..f21da7c 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
@@ -4,7 +4,6 @@ import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -24,7 +23,8 @@ import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
 import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
-import org.apache.tez.runtime.library.common.sort.impl.IFileOutputStream;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils.FetchStatsLogger;
 import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
 import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
 import org.apache.tez.runtime.library.partitioner.HashPartitioner;
@@ -32,6 +32,7 @@ import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Matchers;
 import org.slf4j.Logger;
 
 import java.io.ByteArrayInputStream;
@@ -47,8 +48,11 @@ import java.util.Random;
 
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.anyString;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 /**
@@ -70,10 +74,6 @@ import static org.mockito.Mockito.when;
  */
 public class TestShuffleUtils {
 
-  private static final String HOST = "localhost";
-  private static final int PORT = 8080;
-  private static final String PATH_COMPONENT = "attempt";
-
   private OutputContext outputContext;
   private Configuration conf;
   private FileSystem localFs;
@@ -313,4 +313,26 @@ public class TestShuffleUtils {
     } catch (IOException e) {
     }
   }
+
+  @Test
+  public void testFetchStatsLogger() throws Exception {
+    Logger activeLogger = mock(Logger.class);
+    Logger aggregateLogger = mock(Logger.class);
+    FetchStatsLogger logger = new FetchStatsLogger(activeLogger, aggregateLogger);
+
+    InputAttemptIdentifier ident = new InputAttemptIdentifier(1, 1);
+    when(activeLogger.isInfoEnabled()).thenReturn(false);
+    for (int i = 0; i < 1000; i++) {
+      logger.logIndividualFetchComplete(10, 100, 1000, "testType", ident);
+    }
+    verify(activeLogger, times(0)).info(anyString());
+    verify(aggregateLogger, times(1)).info(anyString(), Matchers.<Object[]>anyVararg());
+
+    when(activeLogger.isInfoEnabled()).thenReturn(true);
+    for (int i = 0; i < 1000; i++) {
+      logger.logIndividualFetchComplete(10, 100, 1000, "testType", ident);
+    }
+    verify(activeLogger, times(1000)).info(anyString());
+    verify(aggregateLogger, times(1)).info(anyString(), Matchers.<Object[]>anyVararg());
+  }
 }