You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zh...@apache.org on 2017/03/28 19:47:43 UTC
[02/50] [abbrv] tez git commit: TEZ-3581. Add different logger to
enable suppressing logs for specific lines. Contributed by Harish Jaiprakash.
TEZ-3581. Add different logger to enable suppressing logs for specific
lines. Contributed by Harish Jaiprakash.
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d415197c
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d415197c
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d415197c
Branch: refs/heads/TEZ-1190
Commit: d415197c67562ee2859b240e8a7e316067c4ed6b
Parents: c0270cb
Author: Siddharth Seth <ss...@apache.org>
Authored: Sun Feb 5 19:08:08 2017 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Sun Feb 5 19:08:08 2017 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../tez/dag/history/HistoryEventHandler.java | 23 +++--
.../resources/tez-container-log4j.properties | 8 ++
.../org/apache/tez/http/HttpConnection.java | 18 +++-
.../library/common/shuffle/ShuffleUtils.java | 92 ++++++++++++++------
.../common/shuffle/impl/ShuffleManager.java | 7 +-
.../orderedgrouped/ShuffleScheduler.java | 8 +-
.../common/shuffle/TestShuffleUtils.java | 34 ++++++--
8 files changed, 144 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/d415197c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 37438d9..c4a1d72 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3581. Add different logger to enable suppressing logs for specific lines.
TEZ-3601. Add another HistoryLogLevel to suppress TaskAttempts at specific levels
TEZ-3600. Fix flaky test: TestTokenCache
TEZ-3589. add a unit test for amKeepAlive not being shutdown if an app takes a long time to launch.
http://git-wip-us.apache.org/repos/asf/tez/blob/d415197c/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
index 79d1fc3..4fa1926 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,6 +45,8 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
public class HistoryEventHandler extends CompositeService {
private static Logger LOG = LoggerFactory.getLogger(HistoryEventHandler.class);
+ private static Logger LOG_CRITICAL_EVENTS =
+ LoggerFactory.getLogger(LOG.getName() + ".criticalEvents");
private final AppContext context;
private RecoveryService recoveryService;
@@ -59,6 +62,8 @@ public class HistoryEventHandler extends CompositeService {
private final ConcurrentHashMap<TezTaskAttemptID, DAGHistoryEvent> suppressedEvents =
new ConcurrentHashMap<>();
+ private final AtomicLong criticalEventCount = new AtomicLong();
+
public HistoryEventHandler(AppContext context) {
super(HistoryEventHandler.class.getName());
this.context = context;
@@ -141,12 +146,18 @@ public class HistoryEventHandler extends CompositeService {
historyLoggingService.handle(event);
}
- // TODO at some point we should look at removing this once
- // there is a UI in place
- LOG.info("[HISTORY]"
- + "[DAG:" + dagIdStr + "]"
- + "[Event:" + event.getHistoryEvent().getEventType().name() + "]"
- + ": " + event.getHistoryEvent().toString());
+ if (LOG_CRITICAL_EVENTS.isInfoEnabled()) {
+ // TODO at some point we should look at removing this once
+ // there is a UI in place
+ LOG_CRITICAL_EVENTS.info("[HISTORY]"
+ + "[DAG:" + dagIdStr + "]"
+ + "[Event:" + event.getHistoryEvent().getEventType().name() + "]"
+ + ": " + event.getHistoryEvent().toString());
+ } else {
+ if (criticalEventCount.incrementAndGet() % 1000 == 0) {
+ LOG.info("Got {} critical events", criticalEventCount);
+ }
+ }
}
private boolean shouldLogEvent(DAGHistoryEvent event) {
http://git-wip-us.apache.org/repos/asf/tez/blob/d415197c/tez-dag/src/main/resources/tez-container-log4j.properties
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/resources/tez-container-log4j.properties b/tez-dag/src/main/resources/tez-container-log4j.properties
index 4620a78..721cd67 100644
--- a/tez-dag/src/main/resources/tez-container-log4j.properties
+++ b/tez-dag/src/main/resources/tez-container-log4j.properties
@@ -36,3 +36,11 @@ log4j.appender.CLA.layout.ConversionPattern=%d{ISO8601} [%p] [%t] |%c{2}|: %m%n
#
log4j.appender.EventCounter=org.apache.hadoop.log.metrics.EventCounter
+# Disable loggers which log a lot, use this if you want to reduce the log sizes. This will affect
+# the analyzer since it relies on these log lines.
+# log4j.logger.org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager.fetch=WARN
+# log4j.logger.org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleScheduler=WARN
+# log4j.logger.org.apache.tez.http.HttpConnection.url=WARN
+
+# This should be part of the AM log4j.properties file, it will not work from this file.
+# log4j.logger.org.apache.tez.dag.history.HistoryEventHandler.criticalEvents=WARN
http://git-wip-us.apache.org/repos/asf/tez/blob/d415197c/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnection.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnection.java b/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnection.java
index d781e64..9bfe4e7 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnection.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnection.java
@@ -36,10 +36,12 @@ import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
public class HttpConnection extends BaseHttpConnection {
private static final Logger LOG = LoggerFactory.getLogger(HttpConnection.class);
+ private static final Logger URL_LOG = LoggerFactory.getLogger(LOG.getName() + ".url");
private URL url;
private final String logIdentifier;
@@ -56,6 +58,7 @@ public class HttpConnection extends BaseHttpConnection {
private final HttpConnectionParams httpConnParams;
private final StopWatch stopWatch;
+ private final AtomicLong urlLogCount;
/**
* HttpConnection
@@ -73,6 +76,7 @@ public class HttpConnection extends BaseHttpConnection {
this.httpConnParams = connParams;
this.url = url;
this.stopWatch = new StopWatch();
+ this.urlLogCount = new AtomicLong();
if (LOG.isDebugEnabled()) {
LOG.debug("MapOutput URL :" + url.toString());
}
@@ -229,9 +233,17 @@ public class HttpConnection extends BaseHttpConnection {
// verify that replyHash is HMac of encHash
SecureShuffleUtils.verifyReply(replyHash, encHash, jobTokenSecretMgr);
- //Following log statement will be used by tez-tool perf-analyzer for mapping attempt to NM host
- LOG.info("for url=" + url +
- " sent hash and receievd reply " + stopWatch.now(TimeUnit.MILLISECONDS) + " ms");
+ if (URL_LOG.isInfoEnabled()) {
+ // Following log statement will be used by tez-tool perf-analyzer for mapping attempt to NM
+ // host
+ URL_LOG.info("for url=" + url + " sent hash and receievd reply " +
+ stopWatch.now(TimeUnit.MILLISECONDS) + " ms");
+ } else {
+ // Log summary.
+ if (urlLogCount.incrementAndGet() % 1000 == 0) {
+ LOG.info("Sent hash and recieved reply for {} urls", urlLogCount);
+ }
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/tez/blob/d415197c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index aa07233..82e844d 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -29,6 +29,7 @@ import java.text.DecimalFormat;
import java.util.BitSet;
import java.util.Collection;
import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.Deflater;
import javax.annotation.Nullable;
@@ -516,35 +517,70 @@ public class ShuffleUtils {
return builder.build();
}
- /**
- * Log individual fetch complete event.
- * This log information would be used by tez-tool/perf-analzyer/shuffle tools for mining
- * - amount of data transferred between source to destination machine
- * - time taken to transfer data between source to destination machine
- * - details on DISK/DISK_DIRECT/MEMORY based shuffles
- *
- * @param log
- * @param millis
- * @param bytesCompressed
- * @param bytesDecompressed
- * @param outputType
- * @param srcAttemptIdentifier
- */
- public static void logIndividualFetchComplete(Logger log, long millis, long
- bytesCompressed,
- long bytesDecompressed, String outputType, InputAttemptIdentifier srcAttemptIdentifier) {
- double rate = 0;
- if (millis != 0) {
- rate = bytesCompressed / ((double) millis / 1000);
- rate = rate / (1024 * 1024);
+ public static class FetchStatsLogger {
+ private final Logger activeLogger;
+ private final Logger aggregateLogger;
+ private final AtomicLong logCount = new AtomicLong();
+ private final AtomicLong compressedSize = new AtomicLong();
+ private final AtomicLong decompressedSize = new AtomicLong();
+ private final AtomicLong totalTime = new AtomicLong();
+
+ public FetchStatsLogger(Logger activeLogger, Logger aggregateLogger) {
+ this.activeLogger = activeLogger;
+ this.aggregateLogger = aggregateLogger;
+ }
+
+ /**
+ * Log individual fetch complete event.
+ * This log information would be used by tez-tool/perf-analzyer/shuffle tools for mining
+ * - amount of data transferred between source to destination machine
+ * - time taken to transfer data between source to destination machine
+ * - details on DISK/DISK_DIRECT/MEMORY based shuffles
+ *
+ * @param millis
+ * @param bytesCompressed
+ * @param bytesDecompressed
+ * @param outputType
+ * @param srcAttemptIdentifier
+ */
+ public void logIndividualFetchComplete(long millis, long bytesCompressed,
+ long bytesDecompressed, String outputType, InputAttemptIdentifier srcAttemptIdentifier) {
+ double rate = 0;
+ if (millis != 0) {
+ rate = bytesCompressed / ((double) millis / 1000);
+ rate = rate / (1024 * 1024);
+ }
+ if (activeLogger.isInfoEnabled()) {
+ activeLogger.info(
+ "Completed fetch for attempt: "
+ + toShortString(srcAttemptIdentifier)
+ +" to " + outputType +
+ ", csize=" + bytesCompressed + ", dsize=" + bytesDecompressed +
+ ", EndTime=" + System.currentTimeMillis() + ", TimeTaken=" + millis + ", Rate=" +
+ MBPS_FORMAT.get().format(rate) + " MB/s");
+ } else {
+ long currentCount, currentCompressedSize, currentDecompressedSize, currentTotalTime;
+ synchronized (this) {
+ currentCount = logCount.incrementAndGet();
+ currentCompressedSize = compressedSize.addAndGet(bytesCompressed);
+ currentDecompressedSize = decompressedSize.addAndGet(bytesDecompressed);
+ currentTotalTime = totalTime.addAndGet(millis);
+ if (currentCount % 1000 == 0) {
+ compressedSize.set(0);
+ decompressedSize.set(0);
+ totalTime.set(0);
+ }
+ }
+ if (currentCount % 1000 == 0) {
+ double avgRate = currentTotalTime == 0 ? 0
+ : currentCompressedSize / (double)currentTotalTime / 1000 / 1024 / 1024;
+ aggregateLogger.info("Completed {} fetches, stats for last 1000 fetches: "
+ + "avg csize: {}, avg dsize: {}, avgTime: {}, avgRate: {}", currentCount,
+ currentCompressedSize / 1000, currentDecompressedSize / 1000, currentTotalTime / 1000,
+ MBPS_FORMAT.get().format(avgRate));
+ }
+ }
}
- log.info(
- "Completed fetch for attempt: "
- + toShortString(srcAttemptIdentifier)
- +" to " + outputType +
- ", csize=" + bytesCompressed + ", dsize=" + bytesDecompressed +
- ", EndTime=" + System.currentTimeMillis() + ", TimeTaken=" + millis + ", Rate=" +
- MBPS_FORMAT.get().format(rate) + " MB/s");
}
private static String toShortString(InputAttemptIdentifier inputAttemptIdentifier) {
http://git-wip-us.apache.org/repos/asf/tez/blob/d415197c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index d034b2e..b2ff51d 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -80,6 +80,7 @@ import org.apache.tez.runtime.library.common.shuffle.HostPort;
import org.apache.tez.runtime.library.common.shuffle.InputHost;
import org.apache.tez.runtime.library.common.shuffle.InputHost.PartitionToInputs;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils.FetchStatsLogger;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
@@ -98,6 +99,8 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class ShuffleManager implements FetcherCallback {
private static final Logger LOG = LoggerFactory.getLogger(ShuffleManager.class);
+ private static final Logger LOG_FETCH = LoggerFactory.getLogger(LOG.getName() + ".fetch");
+ private static final FetchStatsLogger fetchStatsLogger = new FetchStatsLogger(LOG_FETCH, LOG);
private final InputContext inputContext;
private final int numInputs;
@@ -628,8 +631,8 @@ public class ShuffleManager implements FetcherCallback {
if (!completedInputSet.contains(inputIdentifier)) {
fetchedInput.commit();
committed = true;
- ShuffleUtils.logIndividualFetchComplete(LOG, copyDuration,
- fetchedBytes, decompressedLength, fetchedInput.getType().toString(), srcAttemptIdentifier);
+ fetchStatsLogger.logIndividualFetchComplete(copyDuration, fetchedBytes,
+ decompressedLength, fetchedInput.getType().toString(), srcAttemptIdentifier);
// Processing counters for completed and commit fetches only. Need
// additional counters for excessive fetches - which primarily comes
http://git-wip-us.apache.org/repos/asf/tez/blob/d415197c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index 3d2c1ad..cce486c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -73,6 +73,7 @@ import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.TezRuntimeUtils;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils.FetchStatsLogger;
import org.apache.tez.runtime.library.common.shuffle.HostPort;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapHost.HostPortPartition;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapOutput.Type;
@@ -140,6 +141,9 @@ class ShuffleScheduler {
private final AtomicLong shuffleStart = new AtomicLong(0);
private static final Logger LOG = LoggerFactory.getLogger(ShuffleScheduler.class);
+ private static final Logger LOG_FETCH = LoggerFactory.getLogger(LOG.getName() + ".fetch");
+ private static final FetchStatsLogger fetchStatsLogger = new FetchStatsLogger(LOG_FETCH, LOG);
+
static final long INITIAL_PENALTY = 2000L; // 2 seconds
private static final float PENALTY_GROWTH_RATE = 1.3f;
@@ -576,8 +580,8 @@ class ShuffleScheduler {
}
output.commit();
- ShuffleUtils.logIndividualFetchComplete(LOG, millis, bytesCompressed,
- bytesDecompressed, output.getType().toString(), srcAttemptIdentifier);
+ fetchStatsLogger.logIndividualFetchComplete(millis, bytesCompressed, bytesDecompressed,
+ output.getType().toString(), srcAttemptIdentifier);
if (output.getType() == Type.DISK) {
bytesShuffledToDisk.increment(bytesCompressed);
} else if (output.getType() == Type.DISK_DIRECT) {
http://git-wip-us.apache.org/repos/asf/tez/blob/d415197c/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
index 496468b..f21da7c 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
@@ -4,7 +4,6 @@ import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
@@ -24,7 +23,8 @@ import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
-import org.apache.tez.runtime.library.common.sort.impl.IFileOutputStream;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils.FetchStatsLogger;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
import org.apache.tez.runtime.library.partitioner.HashPartitioner;
@@ -32,6 +32,7 @@ import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Matchers;
import org.slf4j.Logger;
import java.io.ByteArrayInputStream;
@@ -47,8 +48,11 @@ import java.util.Random;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
@@ -70,10 +74,6 @@ import static org.mockito.Mockito.when;
*/
public class TestShuffleUtils {
- private static final String HOST = "localhost";
- private static final int PORT = 8080;
- private static final String PATH_COMPONENT = "attempt";
-
private OutputContext outputContext;
private Configuration conf;
private FileSystem localFs;
@@ -313,4 +313,26 @@ public class TestShuffleUtils {
} catch (IOException e) {
}
}
+
+ @Test
+ public void testFetchStatsLogger() throws Exception {
+ Logger activeLogger = mock(Logger.class);
+ Logger aggregateLogger = mock(Logger.class);
+ FetchStatsLogger logger = new FetchStatsLogger(activeLogger, aggregateLogger);
+
+ InputAttemptIdentifier ident = new InputAttemptIdentifier(1, 1);
+ when(activeLogger.isInfoEnabled()).thenReturn(false);
+ for (int i = 0; i < 1000; i++) {
+ logger.logIndividualFetchComplete(10, 100, 1000, "testType", ident);
+ }
+ verify(activeLogger, times(0)).info(anyString());
+ verify(aggregateLogger, times(1)).info(anyString(), Matchers.<Object[]>anyVararg());
+
+ when(activeLogger.isInfoEnabled()).thenReturn(true);
+ for (int i = 0; i < 1000; i++) {
+ logger.logIndividualFetchComplete(10, 100, 1000, "testType", ident);
+ }
+ verify(activeLogger, times(1000)).info(anyString());
+ verify(aggregateLogger, times(1)).info(anyString(), Matchers.<Object[]>anyVararg());
+ }
}