You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2018/08/10 17:12:44 UTC
[1/3] storm git commit: STORM-3133: Extend metrics on Nimbus and
LogViewer:
Repository: storm
Updated Branches:
refs/heads/master 16e500844 -> 154173a70
http://git-wip-us.apache.org/repos/asf/storm/blob/bf81b684/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogCleaner.java
----------------------------------------------------------------------
diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogCleaner.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogCleaner.java
index b8468ec..ae8aff6 100644
--- a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogCleaner.java
+++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogCleaner.java
@@ -27,6 +27,8 @@ import static org.apache.storm.DaemonConfig.LOGVIEWER_CLEANUP_INTERVAL_SECS;
import static org.apache.storm.DaemonConfig.LOGVIEWER_MAX_PER_WORKER_LOGS_SIZE_MB;
import static org.apache.storm.DaemonConfig.LOGVIEWER_MAX_SUM_WORKER_LOGS_SIZE_MB;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import java.io.Closeable;
@@ -48,8 +50,10 @@ import java.util.concurrent.TimeUnit;
import java.util.function.BinaryOperator;
import java.util.stream.StreamSupport;
+import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.storm.StormTimer;
+import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Utils;
@@ -62,6 +66,9 @@ import org.slf4j.LoggerFactory;
*/
public class LogCleaner implements Runnable, Closeable {
private static final Logger LOG = LoggerFactory.getLogger(LogCleaner.class);
+ private static final Timer cleanupRoutineDuration = StormMetricsRegistry.registerTimer("logviewer:cleanup-routine-duration-ms");
+ private static final Histogram numFilesCleanedUp = StormMetricsRegistry.registerHistogram("logviewer:num-files-cleaned-up");
+ private static final Histogram diskSpaceFreed = StormMetricsRegistry.registerHistogram("logviewer:disk-space-freed-in-bytes");
private final Map<String, Object> stormConf;
private final Integer intervalSecs;
@@ -95,6 +102,8 @@ public class LogCleaner implements Runnable, Closeable {
LOG.info("configured max total size of worker logs: {} MB, max total size of worker logs per directory: {} MB",
maxSumWorkerLogsSizeMb, maxPerWorkerLogsSizeMb);
+ //Switch to CachedGauge if this starts to hurt performance
+ StormMetricsRegistry.registerGauge("logviewer:worker-log-dir-size", () -> FileUtils.sizeOf(logRootDir));
}
/**
@@ -131,7 +140,9 @@ public class LogCleaner implements Runnable, Closeable {
*/
@Override
public void run() {
- try {
+ int numFilesCleaned = 0;
+ long diskSpaceCleaned = 0L;
+ try (Timer.Context t = cleanupRoutineDuration.time()) {
final long nowMills = Time.currentTimeMillis();
Set<File> oldLogDirs = selectDirsForCleanup(nowMills);
@@ -142,30 +153,41 @@ public class LogCleaner implements Runnable, Closeable {
oldLogDirs.stream().map(File::getName).collect(joining(",")),
deadWorkerDirs.stream().map(File::getName).collect(joining(",")));
- deadWorkerDirs.forEach(Unchecked.consumer(dir -> {
+ for (File dir : deadWorkerDirs) {
String path = dir.getCanonicalPath();
- LOG.info("Cleaning up: Removing {}", path);
+ long sizeInBytes = FileUtils.sizeOf(dir);
+ LOG.info("Cleaning up: Removing {}, {} KB", path, sizeInBytes * 1e-3);
try {
Utils.forceDelete(path);
cleanupEmptyTopoDirectory(dir);
+ numFilesCleaned++;
+ diskSpaceCleaned += sizeInBytes;
} catch (Exception ex) {
+ ExceptionMeters.NUM_FILE_REMOVAL_EXCEPTIONS.mark();
LOG.error(ex.getMessage(), ex);
}
- }));
+ }
- perWorkerDirCleanup(maxPerWorkerLogsSizeMb * 1024 * 1024);
- globalLogCleanup(maxSumWorkerLogsSizeMb * 1024 * 1024);
+ final List<DeletionMeta> perWorkerDirCleanupMeta = perWorkerDirCleanup(maxPerWorkerLogsSizeMb * 1024 * 1024);
+ numFilesCleaned += perWorkerDirCleanupMeta.stream().mapToInt(meta -> meta.deletedFiles).sum();
+ diskSpaceCleaned += perWorkerDirCleanupMeta.stream().mapToLong(meta -> meta.deletedSize).sum();
+ final DeletionMeta globalLogCleanupMeta = globalLogCleanup(maxSumWorkerLogsSizeMb * 1024 * 1024);
+ numFilesCleaned += globalLogCleanupMeta.deletedFiles;
+ diskSpaceCleaned += globalLogCleanupMeta.deletedSize;
} catch (Exception ex) {
+ ExceptionMeters.NUM_CLEANUP_EXCEPTIONS.mark();
LOG.error("Exception while cleaning up old log.", ex);
}
+ numFilesCleanedUp.update(numFilesCleaned);
+ diskSpaceFreed.update(diskSpaceCleaned);
}
/**
* Delete the oldest files in each overloaded worker log dir.
*/
@VisibleForTesting
- List<Integer> perWorkerDirCleanup(long size) {
+ List<DeletionMeta> perWorkerDirCleanup(long size) {
return workerLogs.getAllWorkerDirs().stream()
.map(Unchecked.function(dir ->
directoryCleaner.deleteOldestWhileTooLarge(Collections.singletonList(dir), size, true, null)))
@@ -176,7 +198,7 @@ public class LogCleaner implements Runnable, Closeable {
* Delete the oldest files in overloaded worker-artifacts globally.
*/
@VisibleForTesting
- int globalLogCleanup(long size) throws Exception {
+ DeletionMeta globalLogCleanup(long size) throws Exception {
List<File> workerDirs = new ArrayList<>(workerLogs.getAllWorkerDirs());
Set<String> aliveWorkerDirs = new HashSet<>(workerLogs.getAliveWorkerDirs());
@@ -223,8 +245,8 @@ public class LogCleaner implements Runnable, Closeable {
@VisibleForTesting
FileFilter mkFileFilterForLogCleanup(long nowMillis) {
- final long cutoffAgeMillis = cleanupCutoffAgeMillis(nowMillis);
- return file -> !file.isFile() && lastModifiedTimeWorkerLogdir(file) <= cutoffAgeMillis;
+ //Doesn't it make more sense to do file.isDirectory here?
+ return file -> !file.isFile() && lastModifiedTimeWorkerLogdir(file) <= cleanupCutoffAgeMillis(nowMillis);
}
/**
@@ -235,7 +257,7 @@ public class LogCleaner implements Runnable, Closeable {
private long lastModifiedTimeWorkerLogdir(File logDir) {
long dirModified = logDir.lastModified();
- DirectoryStream<Path> dirStream = null;
+ DirectoryStream<Path> dirStream;
try {
dirStream = directoryCleaner.getStreamForDirectory(logDir);
} catch (IOException e) {
@@ -256,9 +278,7 @@ public class LogCleaner implements Runnable, Closeable {
LOG.error(ex.getMessage(), ex);
return dirModified;
} finally {
- if (DirectoryStream.class.isInstance(dirStream)) {
- IOUtils.closeQuietly(dirStream);
- }
+ IOUtils.closeQuietly(dirStream);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/bf81b684/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogFileDownloader.java
----------------------------------------------------------------------
diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogFileDownloader.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogFileDownloader.java
index 67b265d..bfb3065 100644
--- a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogFileDownloader.java
+++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogFileDownloader.java
@@ -18,12 +18,19 @@
package org.apache.storm.daemon.logviewer.utils;
+import com.codahale.metrics.Histogram;
+
import java.io.File;
import java.io.IOException;
import javax.ws.rs.core.Response;
+import org.apache.commons.io.FileUtils;
+import org.apache.storm.metric.StormMetricsRegistry;
+
+
public class LogFileDownloader {
+ private static final Histogram fileDownloadSizeDistMB= StormMetricsRegistry.registerHistogram("logviewer:download-file-size-rounded-MB");
private final String logRoot;
private final String daemonLogRoot;
@@ -55,6 +62,7 @@ public class LogFileDownloader {
File file = new File(rootDir, fileName).getCanonicalFile();
if (file.exists()) {
if (isDaemon || resourceAuthorizer.isUserAllowedToAccessFile(user, fileName)) {
+ fileDownloadSizeDistMB.update(Math.round((double) file.length() / FileUtils.ONE_MB));
return LogviewerResponseBuilder.buildDownloadFile(file);
} else {
return LogviewerResponseBuilder.buildResponseUnauthorizedUser(user);
http://git-wip-us.apache.org/repos/asf/storm/blob/bf81b684/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogviewerResponseBuilder.java
----------------------------------------------------------------------
diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogviewerResponseBuilder.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogviewerResponseBuilder.java
index b92a559..4c8a191 100644
--- a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogviewerResponseBuilder.java
+++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogviewerResponseBuilder.java
@@ -76,13 +76,18 @@ public class LogviewerResponseBuilder {
* @param file file to download
*/
public static Response buildDownloadFile(File file) throws IOException {
- // do not close this InputStream in method: it will be used from jetty server
- InputStream is = new FileInputStream(file);
- return Response.status(OK)
- .entity(wrapWithStreamingOutput(is))
- .type(MediaType.APPLICATION_OCTET_STREAM_TYPE)
- .header("Content-Disposition", "attachment; filename=\"" + file.getName() + "\"")
- .build();
+ try {
+ // do not close this InputStream in method: it will be used from jetty server
+ InputStream is = new FileInputStream(file);
+ return Response.status(OK)
+ .entity(wrapWithStreamingOutput(is))
+ .type(MediaType.APPLICATION_OCTET_STREAM_TYPE)
+ .header("Content-Disposition", "attachment; filename=\"" + file.getName() + "\"")
+ .build();
+ } catch (IOException e) {
+ ExceptionMeters.NUM_FILE_DOWNLOAD_EXCEPTIONS.mark();
+ throw e;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/storm/blob/bf81b684/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java
----------------------------------------------------------------------
diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java
index eda2478..d566e3d 100644
--- a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java
+++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java
@@ -22,7 +22,6 @@ import static java.util.stream.Collectors.toCollection;
import static java.util.stream.Collectors.toMap;
import static org.apache.storm.Config.SUPERVISOR_RUN_WORKER_AS_USER;
import static org.apache.storm.Config.TOPOLOGY_SUBMITTER_USER;
-import static org.apache.storm.daemon.utils.ListFunctionalSupport.takeLast;
import com.google.common.collect.Lists;
@@ -88,9 +87,14 @@ public class WorkerLogs {
if (runAsUser && topoOwner.isPresent() && file.exists() && !Files.isReadable(file.toPath())) {
LOG.debug("Setting permissions on file {} with topo-owner {}", fileName, topoOwner);
- ClientSupervisorUtils.processLauncherAndWait(stormConf, topoOwner.get(),
- Lists.newArrayList("blob", file.getCanonicalPath()), null,
- "setup group read permissions for file: " + fileName);
+ try {
+ ClientSupervisorUtils.processLauncherAndWait(stormConf, topoOwner.get(),
+ Lists.newArrayList("blob", file.getCanonicalPath()), null,
+ "setup group read permissions for file: " + fileName);
+ } catch (IOException e) {
+ ExceptionMeters.NUM_SET_PERMISSION_EXCEPTIONS.mark();
+ throw e;
+ }
}
}
@@ -127,7 +131,7 @@ public class WorkerLogs {
/**
* Return a sorted set of java.io.Files that were written by workers that are now active.
*/
- public SortedSet<String> getAliveWorkerDirs() throws Exception {
+ public SortedSet<String> getAliveWorkerDirs() {
Set<String> aliveIds = getAliveIds(Time.currentTimeSecs());
Set<File> logDirs = getAllWorkerDirs();
Map<String, File> idToDir = identifyWorkerLogDirs(logDirs);
@@ -177,7 +181,7 @@ public class WorkerLogs {
*
* @param nowSecs current time in seconds
*/
- public Set<String> getAliveIds(int nowSecs) throws Exception {
+ public Set<String> getAliveIds(int nowSecs) {
return SupervisorUtils.readWorkerHeartbeats(stormConf).entrySet().stream()
.filter(entry -> Objects.nonNull(entry.getValue())
&& !SupervisorUtils.isWorkerHbTimedOut(nowSecs, entry.getValue(), stormConf))
http://git-wip-us.apache.org/repos/asf/storm/blob/bf81b684/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/webapp/LogviewerResource.java
----------------------------------------------------------------------
diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/webapp/LogviewerResource.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/webapp/LogviewerResource.java
index 85285ac..08881be 100644
--- a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/webapp/LogviewerResource.java
+++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/webapp/LogviewerResource.java
@@ -19,6 +19,7 @@
package org.apache.storm.daemon.logviewer.webapp;
import com.codahale.metrics.Meter;
+import com.codahale.metrics.Timer;
import java.io.IOException;
import java.net.URLDecoder;
@@ -38,6 +39,7 @@ import org.apache.storm.daemon.logviewer.handler.LogviewerLogDownloadHandler;
import org.apache.storm.daemon.logviewer.handler.LogviewerLogPageHandler;
import org.apache.storm.daemon.logviewer.handler.LogviewerLogSearchHandler;
import org.apache.storm.daemon.logviewer.handler.LogviewerProfileHandler;
+import org.apache.storm.daemon.logviewer.utils.ExceptionMeters;
import org.apache.storm.daemon.ui.InvalidRequestException;
import org.apache.storm.daemon.ui.UIHelpers;
import org.apache.storm.daemon.ui.resources.StormApiResource;
@@ -62,6 +64,14 @@ public class LogviewerResource {
"logviewer:num-download-log-daemon-file-http-requests");
private static final Meter meterListLogsHttpRequests = StormMetricsRegistry.registerMeter("logviewer:num-list-logs-http-requests");
+ private static final Meter numSearchLogRequests = StormMetricsRegistry.registerMeter("logviewer:num-search-logs-requests");
+ private static final Meter numDeepSearchArchived = StormMetricsRegistry.registerMeter(
+ "logviewer:num-deep-search-requests-with-archived");
+ private static final Meter numDeepSearchNonArchived = StormMetricsRegistry.registerMeter(
+ "logviewer:num-deep-search-requests-without-archived");
+ private static final Timer searchLogRequestDuration = StormMetricsRegistry.registerTimer("logviewer:search-requests-duration-ms");
+ private static final Timer deepSearchRequestDuration = StormMetricsRegistry.registerTimer("logviewer:deep-search-request-duration-ms");
+
private final LogviewerLogPageHandler logviewer;
private final LogviewerProfileHandler profileHandler;
private final LogviewerLogDownloadHandler logDownloadHandler;
@@ -105,6 +115,9 @@ public class LogviewerResource {
} catch (InvalidRequestException e) {
LOG.error(e.getMessage(), e);
return Response.status(400).entity(e.getMessage()).build();
+ } catch (IOException e) {
+ ExceptionMeters.NUM_READ_LOG_EXCEPTIONS.mark();
+ throw e;
}
}
@@ -126,6 +139,9 @@ public class LogviewerResource {
} catch (InvalidRequestException e) {
LOG.error(e.getMessage(), e);
return Response.status(400).entity(e.getMessage()).build();
+ } catch (IOException e) {
+ ExceptionMeters.NUM_READ_DAEMON_LOG_EXCEPTIONS.mark();
+ throw e;
}
}
@@ -158,7 +174,12 @@ public class LogviewerResource {
String callback = request.getParameter(StormApiResource.callbackParameterName);
String origin = request.getHeader("Origin");
- return logviewer.listLogFiles(user, portStr != null ? Integer.parseInt(portStr) : null, topologyId, callback, origin);
+ try {
+ return logviewer.listLogFiles(user, portStr != null ? Integer.parseInt(portStr) : null, topologyId, callback, origin);
+ } catch (IOException e) {
+ ExceptionMeters.NUM_LIST_LOG_EXCEPTIONS.mark();
+ throw e;
+ }
}
/**
@@ -169,7 +190,12 @@ public class LogviewerResource {
public Response listDumpFiles(@PathParam("topo-id") String topologyId, @PathParam("host-port") String hostPort,
@Context HttpServletRequest request) throws IOException {
String user = httpCredsHandler.getUserName(request);
- return profileHandler.listDumpFiles(topologyId, hostPort, user);
+ try {
+ return profileHandler.listDumpFiles(topologyId, hostPort, user);
+ } catch (IOException e) {
+ ExceptionMeters.NUM_LIST_DUMP_EXCEPTIONS.mark();
+ throw e;
+ }
}
/**
@@ -180,7 +206,12 @@ public class LogviewerResource {
public Response downloadDumpFile(@PathParam("topo-id") String topologyId, @PathParam("host-port") String hostPort,
@PathParam("filename") String fileName, @Context HttpServletRequest request) throws IOException {
String user = httpCredsHandler.getUserName(request);
- return profileHandler.downloadDumpFile(topologyId, hostPort, fileName, user);
+ try {
+ return profileHandler.downloadDumpFile(topologyId, hostPort, fileName, user);
+ } catch (IOException e) {
+ ExceptionMeters.NUM_DOWNLOAD_DUMP_EXCEPTIONS.mark();
+ throw e;
+ }
}
/**
@@ -194,7 +225,12 @@ public class LogviewerResource {
String user = httpCredsHandler.getUserName(request);
String file = request.getParameter("file");
String decodedFileName = URLDecoder.decode(file);
- return logDownloadHandler.downloadLogFile(decodedFileName, user);
+ try {
+ return logDownloadHandler.downloadLogFile(decodedFileName, user);
+ } catch (IOException e) {
+ ExceptionMeters.NUM_DOWNLOAD_LOG_EXCEPTIONS.mark();
+ throw e;
+ }
}
/**
@@ -208,7 +244,12 @@ public class LogviewerResource {
String user = httpCredsHandler.getUserName(request);
String file = request.getParameter("file");
String decodedFileName = URLDecoder.decode(file);
- return logDownloadHandler.downloadDaemonLogFile(decodedFileName, user);
+ try {
+ return logDownloadHandler.downloadDaemonLogFile(decodedFileName, user);
+ } catch (IOException e) {
+ ExceptionMeters.NUM_DOWNLOAD_DAEMON_LOG_EXCEPTIONS.mark();
+ throw e;
+ }
}
/**
@@ -217,6 +258,8 @@ public class LogviewerResource {
@GET
@Path("/search")
public Response search(@Context HttpServletRequest request) throws IOException {
+ numSearchLogRequests.mark();
+
String user = httpCredsHandler.getUserName(request);
boolean isDaemon = StringUtils.equals(request.getParameter("is-daemon"), "yes");
String file = request.getParameter("file");
@@ -227,14 +270,17 @@ public class LogviewerResource {
String callback = request.getParameter(StormApiResource.callbackParameterName);
String origin = request.getHeader("Origin");
- try {
- return logSearchHandler.searchLogFile(decodedFileName, user, isDaemon, searchString, numMatchesStr,
- startByteOffset, callback, origin);
+ try (Timer.Context t = searchLogRequestDuration.time()) {
+ return logSearchHandler.searchLogFile(decodedFileName, user, isDaemon,
+ searchString, numMatchesStr, startByteOffset, callback, origin);
} catch (InvalidRequestException e) {
LOG.error(e.getMessage(), e);
int statusCode = 400;
return new JsonResponseBuilder().setData(UIHelpers.exceptionToJson(e, statusCode)).setCallback(callback)
- .setStatus(statusCode).build();
+ .setStatus(statusCode).build();
+ } catch (IOException e) {
+ ExceptionMeters.NUM_SEARCH_EXCEPTIONS.mark();
+ throw e;
}
}
@@ -244,7 +290,7 @@ public class LogviewerResource {
@GET
@Path("/deepSearch/{topoId}")
public Response deepSearch(@PathParam("topoId") String topologyId,
- @Context HttpServletRequest request) throws IOException {
+ @Context HttpServletRequest request) {
String user = httpCredsHandler.getUserName(request);
String searchString = request.getParameter("search-string");
String numMatchesStr = request.getParameter("num-matches");
@@ -255,8 +301,16 @@ public class LogviewerResource {
String callback = request.getParameter(StormApiResource.callbackParameterName);
String origin = request.getHeader("Origin");
- return logSearchHandler.deepSearchLogsForTopology(topologyId, user, searchString, numMatchesStr, portStr,
- startFileOffset, startByteOffset, BooleanUtils.toBooleanObject(searchArchived), callback, origin);
+ Boolean alsoSearchArchived = BooleanUtils.toBooleanObject(searchArchived);
+ if (BooleanUtils.isTrue(alsoSearchArchived)) {
+ numDeepSearchArchived.mark();
+ } else {
+ numDeepSearchNonArchived.mark();
+ }
+ try (Timer.Context t = deepSearchRequestDuration.time()) {
+ return logSearchHandler.deepSearchLogsForTopology(topologyId, user, searchString, numMatchesStr, portStr, startFileOffset,
+ startByteOffset, alsoSearchArchived, callback, origin);
+ }
}
private int parseIntegerFromMap(Map<String, String[]> map, String parameterKey) throws InvalidRequestException {
http://git-wip-us.apache.org/repos/asf/storm/blob/bf81b684/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogSearchHandlerTest.java
----------------------------------------------------------------------
diff --git a/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogSearchHandlerTest.java b/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogSearchHandlerTest.java
index c7a1fd8..0a450be 100644
--- a/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogSearchHandlerTest.java
+++ b/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogSearchHandlerTest.java
@@ -627,6 +627,7 @@ public class LogviewerLogSearchHandlerTest {
public static class TestDeepSearchLogs {
+ public static final int METRIC_SCANNED_FILES = 0;
private List<File> logFiles;
private String topoPath;
@@ -857,7 +858,7 @@ public class LogviewerLogSearchHandlerTest {
int fileOffset = (Integer) arguments[2];
String search = (String) arguments[4];
- return new LogviewerLogSearchHandler.Matched(fileOffset, search, Collections.emptyList());
+ return new LogviewerLogSearchHandler.Matched(fileOffset, search, Collections.emptyList(), METRIC_SCANNED_FILES);
}).when(handler).findNMatches(any(), anyInt(), anyInt(), anyInt(), any());
return handler;
http://git-wip-us.apache.org/repos/asf/storm/blob/bf81b684/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/LogCleanerTest.java
----------------------------------------------------------------------
diff --git a/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/LogCleanerTest.java b/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/LogCleanerTest.java
index 491de54..8b1c0b4 100644
--- a/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/LogCleanerTest.java
+++ b/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/LogCleanerTest.java
@@ -159,7 +159,10 @@ public class LogCleanerTest {
WorkerLogs workerLogs = new WorkerLogs(conf, rootDir);
LogCleaner logCleaner = new LogCleaner(conf, workerLogs, mockDirectoryCleaner, rootDir);
- List<Integer> deletedFiles = logCleaner.perWorkerDirCleanup(1200);
+ List<Integer> deletedFiles = logCleaner.perWorkerDirCleanup(1200)
+ .stream()
+ .map(deletionMeta -> deletionMeta.deletedFiles)
+ .collect(toList());
assertEquals(Integer.valueOf(4), deletedFiles.get(0));
assertEquals(Integer.valueOf(4), deletedFiles.get(1));
assertEquals(Integer.valueOf(4), deletedFiles.get(deletedFiles.size() - 1));
@@ -218,13 +221,13 @@ public class LogCleanerTest {
Map<String, Object> conf = Utils.readStormConfig();
WorkerLogs stubbedWorkerLogs = new WorkerLogs(conf, rootDir) {
@Override
- public SortedSet<String> getAliveWorkerDirs() throws Exception {
+ public SortedSet<String> getAliveWorkerDirs() {
return new TreeSet<>(Collections.singletonList("/workers-artifacts/topo1/port1"));
}
};
LogCleaner logCleaner = new LogCleaner(conf, stubbedWorkerLogs, mockDirectoryCleaner, rootDir);
- int deletedFiles = logCleaner.globalLogCleanup(2400);
+ int deletedFiles = logCleaner.globalLogCleanup(2400).deletedFiles;
assertEquals(18, deletedFiles);
} finally {
Utils.setInstance(prevUtils);
[3/3] storm git commit: Merge branch 'STORM-3133' of
https://github.com/zd-project/storm into STORM-3133
Posted by et...@apache.org.
Merge branch 'STORM-3133' of https://github.com/zd-project/storm into STORM-3133
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/154173a7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/154173a7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/154173a7
Branch: refs/heads/master
Commit: 154173a70fb0e1b11c08be9b4499c811c30163f1
Parents: 16e5008 bf81b68
Author: Ethan Li <et...@gmail.com>
Authored: Fri Aug 10 12:11:56 2018 -0500
Committer: Ethan Li <et...@gmail.com>
Committed: Fri Aug 10 12:11:56 2018 -0500
----------------------------------------------------------------------
.../org/apache/storm/scheduler/WorkerSlot.java | 7 +
.../org/apache/storm/daemon/nimbus/Nimbus.java | 228 ++++++++++++-------
.../daemon/supervisor/SupervisorUtils.java | 9 +-
.../storm/localizer/LocallyCachedBlob.java | 3 +-
.../storm/metric/StormMetricsRegistry.java | 25 +-
.../storm/nimbus/LeaderListenerCallback.java | 7 +
.../org/apache/storm/scheduler/Cluster.java | 12 +-
.../apache/storm/scheduler/ExecutorDetails.java | 9 +-
.../storm/metric/StormMetricsRegistryTest.java | 111 +++++++++
.../storm/daemon/logviewer/LogviewerServer.java | 6 +-
.../handler/LogviewerLogPageHandler.java | 102 +++++----
.../handler/LogviewerLogSearchHandler.java | 159 ++++++++-----
.../daemon/logviewer/utils/DeletionMeta.java | 31 +++
.../logviewer/utils/DirectoryCleaner.java | 25 +-
.../daemon/logviewer/utils/ExceptionMeters.java | 66 ++++++
.../daemon/logviewer/utils/LogCleaner.java | 48 ++--
.../logviewer/utils/LogFileDownloader.java | 8 +
.../utils/LogviewerResponseBuilder.java | 19 +-
.../daemon/logviewer/utils/WorkerLogs.java | 16 +-
.../logviewer/webapp/LogviewerResource.java | 78 ++++++-
.../handler/LogviewerLogSearchHandlerTest.java | 3 +-
.../daemon/logviewer/utils/LogCleanerTest.java | 9 +-
22 files changed, 722 insertions(+), 259 deletions(-)
----------------------------------------------------------------------
[2/3] storm git commit: STORM-3133: Extend metrics on Nimbus and
LogViewer:
Posted by et...@apache.org.
STORM-3133: Extend metrics on Nimbus and LogViewer:
STORM-3157: Added registration method for MetricSet
STORM-3133: Refactored and added metrics to LogViewer components
STORM-3133: Fixed up Unit test for LogViewer
STORM-3133: Refactored and added metrics to Nimbus components.
STORM-3133: Add nimbus scheduling metrics
STORM-3133: Add metrics for disk usage of workers' logs and performance of LogCleaner routine
STORM-3133: Refactored code and added file partial read count metric for logviewer
STORM-3133: Add metrics for counting LogViewer's IOExceptions
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bf81b684
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bf81b684
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bf81b684
Branch: refs/heads/master
Commit: bf81b6840dba16b506187c53448db8e09a6a9f14
Parents: c9efe3b
Author: Zhengdai Hu <hu...@gmail.com>
Authored: Fri Jul 20 13:41:51 2018 -0500
Committer: Zhengdai Hu <zh...@oath.com>
Committed: Fri Aug 10 12:01:36 2018 -0500
----------------------------------------------------------------------
.../org/apache/storm/scheduler/WorkerSlot.java | 7 +
.../org/apache/storm/daemon/nimbus/Nimbus.java | 228 ++++++++++++-------
.../daemon/supervisor/SupervisorUtils.java | 9 +-
.../storm/localizer/LocallyCachedBlob.java | 3 +-
.../storm/metric/StormMetricsRegistry.java | 25 +-
.../storm/nimbus/LeaderListenerCallback.java | 7 +
.../org/apache/storm/scheduler/Cluster.java | 12 +-
.../apache/storm/scheduler/ExecutorDetails.java | 9 +-
.../storm/metric/StormMetricsRegistryTest.java | 111 +++++++++
.../storm/daemon/logviewer/LogviewerServer.java | 6 +-
.../handler/LogviewerLogPageHandler.java | 102 +++++----
.../handler/LogviewerLogSearchHandler.java | 159 ++++++++-----
.../daemon/logviewer/utils/DeletionMeta.java | 31 +++
.../logviewer/utils/DirectoryCleaner.java | 25 +-
.../daemon/logviewer/utils/ExceptionMeters.java | 66 ++++++
.../daemon/logviewer/utils/LogCleaner.java | 48 ++--
.../logviewer/utils/LogFileDownloader.java | 8 +
.../utils/LogviewerResponseBuilder.java | 19 +-
.../daemon/logviewer/utils/WorkerLogs.java | 16 +-
.../logviewer/webapp/LogviewerResource.java | 78 ++++++-
.../handler/LogviewerLogSearchHandlerTest.java | 3 +-
.../daemon/logviewer/utils/LogCleanerTest.java | 9 +-
22 files changed, 722 insertions(+), 259 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/bf81b684/storm-client/src/jvm/org/apache/storm/scheduler/WorkerSlot.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/scheduler/WorkerSlot.java b/storm-client/src/jvm/org/apache/storm/scheduler/WorkerSlot.java
index 07064db..fa963d2 100644
--- a/storm-client/src/jvm/org/apache/storm/scheduler/WorkerSlot.java
+++ b/storm-client/src/jvm/org/apache/storm/scheduler/WorkerSlot.java
@@ -12,6 +12,9 @@
package org.apache.storm.scheduler;
+import java.util.Arrays;
+import java.util.List;
+
public class WorkerSlot {
private final String nodeId;
private final int port;
@@ -39,6 +42,10 @@ public class WorkerSlot {
return getNodeId() + ":" + getPort();
}
+ public List<Object> toList() {
+ return Arrays.asList(nodeId, (long) port);
+ }
+
@Override
public int hashCode() {
return nodeId.hashCode() + 13 * ((Integer) port).hashCode();
http://git-wip-us.apache.org/repos/asf/storm/blob/bf81b684/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index c401f60..a096217 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -18,9 +18,9 @@
package org.apache.storm.daemon.nimbus;
-import com.codahale.metrics.ExponentiallyDecayingReservoir;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
+import com.codahale.metrics.Timer;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
@@ -37,6 +37,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -45,6 +46,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -53,6 +55,7 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.security.auth.Subject;
+
import org.apache.storm.Config;
import org.apache.storm.Constants;
import org.apache.storm.DaemonConfig;
@@ -181,6 +184,8 @@ import org.apache.storm.security.auth.workertoken.WorkerTokenManager;
import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.storm.shade.com.google.common.base.Strings;
import org.apache.storm.shade.com.google.common.collect.ImmutableMap;
+import org.apache.storm.shade.com.google.common.collect.MapDifference;
+import org.apache.storm.shade.com.google.common.collect.Maps;
import org.apache.storm.shade.org.apache.curator.framework.CuratorFramework;
import org.apache.storm.shade.org.apache.zookeeper.ZooDefs;
import org.apache.storm.shade.org.apache.zookeeper.data.ACL;
@@ -251,10 +256,18 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
private static final Meter getTopologyPageInfoCalls = StormMetricsRegistry.registerMeter("nimbus:num-getTopologyPageInfo-calls");
private static final Meter getSupervisorPageInfoCalls = StormMetricsRegistry.registerMeter("nimbus:num-getSupervisorPageInfo-calls");
private static final Meter getComponentPageInfoCalls = StormMetricsRegistry.registerMeter("nimbus:num-getComponentPageInfo-calls");
- private static final Histogram scheduleTopologyTimeMs = StormMetricsRegistry.registerHistogram("nimbus:time-scheduleTopology-ms",
- new ExponentiallyDecayingReservoir());
private static final Meter getOwnerResourceSummariesCalls = StormMetricsRegistry.registerMeter(
"nimbus:num-getOwnerResourceSummaries-calls");
+ //Timer
+ private static final Timer fileUploadDuration = StormMetricsRegistry.registerTimer("nimbus:files-upload-duration-ms");
+ private static final Timer schedulingDuration = StormMetricsRegistry.registerTimer("nimbus:topology-scheduling-duration-ms");
+ //Scheduler histogram
+ private static final Histogram numAddedExecPerScheduling = StormMetricsRegistry.registerHistogram("nimbus:num-added-executors-per-scheduling");
+ private static final Histogram numAddedSlotPerScheduling = StormMetricsRegistry.registerHistogram("nimbus:num-added-slots-per-scheduling");
+ private static final Histogram numRemovedExecPerScheduling = StormMetricsRegistry.registerHistogram("nimbus:num-removed-executors-per-scheduling");
+ private static final Histogram numRemovedSlotPerScheduling = StormMetricsRegistry.registerHistogram("nimbus:num-removed-slots-per-scheduling");
+ private static final Histogram numNetExecIncreasePerScheduling = StormMetricsRegistry.registerHistogram("nimbus:num-net-executors-increase-per-scheduling");
+ private static final Histogram numNetSlotIncreasePerScheduling = StormMetricsRegistry.registerHistogram("nimbus:num-net-slots-increase-per-scheduling");
// END Metrics
private static final Meter shutdownCalls = StormMetricsRegistry.registerMeter("nimbus:num-shutdown-calls");
private static final Meter processWorkerMetricsCalls = StormMetricsRegistry.registerMeter("nimbus:process-worker-metric-calls");
@@ -411,6 +424,10 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
private final StormTimer timer;
private final IScheduler scheduler;
private final IScheduler underlyingScheduler;
+ //Metrics related
+ private final AtomicReference<Long> schedulingStartTimeNs = new AtomicReference<>(null);
+ private final AtomicLong longestSchedulingTime = new AtomicLong();
+
private final ILeaderElector leaderElector;
private final AssignmentDistributionService assignmentsDistributer;
private final AtomicReference<Map<String, String>> idToSchedStatus;
@@ -550,6 +567,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
});
}
+ //Not symmetric difference. Performing A.entrySet() - B.entrySet()
private static <K, V> Map<K, V> mapDiff(Map<? extends K, ? extends V> first, Map<? extends K, ? extends V> second) {
Map<K, V> ret = new HashMap<>();
for (Entry<? extends K, ? extends V> entry : second.entrySet()) {
@@ -689,26 +707,20 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
* @return {topology-id -> {executor [node port]}} mapping
*/
private static Map<String, Map<List<Long>, List<Object>>> computeTopoToExecToNodePort(
- Map<String, SchedulerAssignment> schedAssignments) {
+ Map<String, SchedulerAssignment> schedAssignments, List<String> assignedTopologyIds) {
Map<String, Map<List<Long>, List<Object>>> ret = new HashMap<>();
for (Entry<String, SchedulerAssignment> schedEntry : schedAssignments.entrySet()) {
Map<List<Long>, List<Object>> execToNodePort = new HashMap<>();
for (Entry<ExecutorDetails, WorkerSlot> execAndNodePort : schedEntry.getValue().getExecutorToSlot().entrySet()) {
ExecutorDetails exec = execAndNodePort.getKey();
WorkerSlot slot = execAndNodePort.getValue();
-
- List<Long> listExec = new ArrayList<>(2);
- listExec.add((long) exec.getStartTask());
- listExec.add((long) exec.getEndTask());
-
- List<Object> nodePort = new ArrayList<>(2);
- nodePort.add(slot.getNodeId());
- nodePort.add((long) slot.getPort());
-
- execToNodePort.put(listExec, nodePort);
+ execToNodePort.put(exec.toList(), slot.toList());
}
ret.put(schedEntry.getKey(), execToNodePort);
}
+ for (String id : assignedTopologyIds) {
+ ret.putIfAbsent(id, null);
+ }
return ret;
}
@@ -735,39 +747,95 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
return ret;
}
- private static Map<String, Map<List<Long>, List<Object>>> computeNewTopoToExecToNodePort(
- Map<String, SchedulerAssignment> schedAssignments, Map<String, Assignment> existingAssignments) {
- Map<String, Map<List<Long>, List<Object>>> ret = computeTopoToExecToNodePort(schedAssignments);
- // Print some useful information
- if (existingAssignments != null && !existingAssignments.isEmpty()) {
- for (Entry<String, Map<List<Long>, List<Object>>> entry : ret.entrySet()) {
- String topoId = entry.getKey();
- Map<List<Long>, List<Object>> execToNodePort = entry.getValue();
- Assignment assignment = existingAssignments.get(topoId);
- if (assignment == null) {
- continue;
+ private boolean auditAssignmentChanges(Map<String, Assignment> existingAssignments,
+ Map<String, Assignment> newAssignments) {
+ assert existingAssignments != null && newAssignments != null;
+ boolean anyChanged = existingAssignments.isEmpty() ^ newAssignments.isEmpty();
+ long numRemovedExec = 0;
+ long numRemovedSlot = 0;
+ long numAddedExec = 0;
+ long numAddedSlot = 0;
+ if (existingAssignments.isEmpty()) {
+ for (Entry<String, Assignment> entry : newAssignments.entrySet()) {
+ final Map<List<Long>, NodeInfo> execToPort = entry.getValue().get_executor_node_port();
+ final long count = new HashSet<>(execToPort.values()).size();
+ LOG.info("Assigning {} to {} slots", entry.getKey(), count);
+ LOG.info("Assign executors: {}", execToPort.keySet());
+ numAddedSlot += count;
+ numAddedExec += execToPort.size();
+ }
+ } else if (newAssignments.isEmpty()) {
+ for (Entry<String, Assignment> entry : existingAssignments.entrySet()) {
+ final Map<List<Long>, NodeInfo> execToPort = entry.getValue().get_executor_node_port();
+ final long count = new HashSet<>(execToPort.values()).size();
+ LOG.info("Removing {} from {} slots", entry.getKey(), count);
+ LOG.info("Remove executors: {}", execToPort.keySet());
+ numRemovedSlot += count;
+ numRemovedExec += execToPort.size();
+ }
+ } else {
+ MapDifference<String, Assignment> difference = Maps.difference(existingAssignments, newAssignments);
+ if (anyChanged = !difference.areEqual()) {
+ for (Entry<String, Assignment> entry : difference.entriesOnlyOnLeft().entrySet()) {
+ final Map<List<Long>, NodeInfo> execToPort = entry.getValue().get_executor_node_port();
+ final long count = new HashSet<>(execToPort.values()).size();
+ LOG.info("Removing {} from {} slots", entry.getKey(), count);
+ LOG.info("Remove executors: {}", execToPort.keySet());
+ numRemovedSlot += count;
+ numRemovedExec += execToPort.size();
}
- Map<List<Long>, NodeInfo> old = assignment.get_executor_node_port();
- Map<List<Long>, List<Object>> reassigned = new HashMap<>();
- for (Entry<List<Long>, List<Object>> execAndNodePort : execToNodePort.entrySet()) {
- NodeInfo oldAssigned = old.get(execAndNodePort.getKey());
- String node = (String) execAndNodePort.getValue().get(0);
- Long port = (Long) execAndNodePort.getValue().get(1);
- if (oldAssigned == null || !oldAssigned.get_node().equals(node)
- || !port.equals(oldAssigned.get_port_iterator().next())) {
- reassigned.put(execAndNodePort.getKey(), execAndNodePort.getValue());
- }
+ for (Entry<String, Assignment> entry : difference.entriesOnlyOnRight().entrySet()) {
+ final Map<List<Long>, NodeInfo> execToPort = entry.getValue().get_executor_node_port();
+ final long count = new HashSet<>(execToPort.values()).size();
+ LOG.info("Assigning {} to {} slots", entry.getKey(), count);
+ LOG.info("Assign executors: {}", execToPort.keySet());
+ numAddedSlot += count;
+ numAddedExec += execToPort.size();
}
+ for (Entry<String, MapDifference.ValueDifference<Assignment>> entry : difference.entriesDiffering().entrySet()) {
+ final Map<List<Long>, NodeInfo> execToSlot = entry.getValue().rightValue().get_executor_node_port();
+ final Set<NodeInfo> slots = new HashSet<>(execToSlot.values());
+ LOG.info("Reassigning {} to {} slots", entry.getKey(), slots.size());
+ LOG.info("Reassign executors: {}", execToSlot.keySet());
+
+ final Map<List<Long>, NodeInfo> oldExecToSlot = entry.getValue().leftValue().get_executor_node_port();
+
+ long commonExecCount = 0;
+ Set<NodeInfo> commonSlots = new HashSet<>(execToSlot.size());
+ for (Entry<List<Long>, NodeInfo> execEntry : execToSlot.entrySet()) {
+ if (execEntry.getValue().equals(oldExecToSlot.get(execEntry.getKey()))) {
+ commonExecCount++;
+ commonSlots.add(execEntry.getValue());
+ }
+ }
+ long commonSlotCount = commonSlots.size();
- if (!reassigned.isEmpty()) {
- int count = (new HashSet<>(execToNodePort.values())).size();
- Set<List<Long>> reExecs = reassigned.keySet();
- LOG.info("Reassigning {} to {} slots", topoId, count);
- LOG.info("Reassign executors: {}", reExecs);
+ //Treat reassign as remove and add
+ numRemovedSlot += new HashSet<>(oldExecToSlot.values()).size() - commonSlotCount;
+ numRemovedExec += oldExecToSlot.size() - commonExecCount;
+ numAddedSlot += slots.size() - commonSlotCount;
+ numAddedExec += execToSlot.size() - commonExecCount;
}
}
+ LOG.debug("{} assignments unchanged: {}", difference.entriesInCommon().size(), difference.entriesInCommon().keySet());
}
- return ret;
+ numAddedExecPerScheduling.update(numAddedExec);
+ numAddedSlotPerScheduling.update(numAddedSlot);
+ numRemovedExecPerScheduling.update(numRemovedExec);
+ numRemovedSlotPerScheduling.update(numRemovedSlot);
+ numNetExecIncreasePerScheduling.update(numAddedExec - numRemovedExec);
+ numNetSlotIncreasePerScheduling.update(numAddedSlot - numRemovedSlot);
+
+ if (anyChanged) {
+ LOG.info("Fragmentation after scheduling is: {} MB, {} PCore CPUs", fragmentedMemory(), fragmentedCpu());
+ nodeIdToResources.get().forEach((id, node) ->
+ LOG.info(
+ "Node Id: {} Total Mem: {}, Used Mem: {}, Available Mem: {}, Total CPU: {}, Used "
+ + "CPU: {}, Available CPU: {}, fragmented: {}",
+ id, node.getTotalMem(), node.getUsedMem(), node.getAvailableMem(),
+ node.getTotalCpu(), node.getUsedCpu(), node.getAvailableCpu(), isFragmented(node)));
+ }
+ return anyChanged;
}
private static List<List<Long>> changedExecutors(Map<List<Long>, NodeInfo> map, Map<List<Long>,
@@ -780,7 +848,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
key.add(ni.get_node());
key.add(ni.get_port_iterator().next());
List<List<Long>> value = new ArrayList<>(entry.getValue());
- value.sort((a, b) -> a.get(0).compareTo(b.get(0)));
+ value.sort(Comparator.comparing(a -> a.get(0)));
slotAssigned.put(key, value);
}
HashMap<List<Object>, List<List<Long>>> tmpNewSlotAssigned = newExecToNodePort == null ? new HashMap<>() :
@@ -788,7 +856,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
HashMap<List<Object>, List<List<Long>>> newSlotAssigned = new HashMap<>();
for (Entry<List<Object>, List<List<Long>>> entry : tmpNewSlotAssigned.entrySet()) {
List<List<Long>> value = new ArrayList<>(entry.getValue());
- value.sort((a, b) -> a.get(0).compareTo(b.get(0)));
+ value.sort(Comparator.comparing(a -> a.get(0)));
newSlotAssigned.put(entry.getKey(), value);
}
Map<List<Object>, List<List<Long>>> diff = mapDiff(slotAssigned, newSlotAssigned);
@@ -1217,7 +1285,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
return allNodeHost;
} else {
// rebalance
- Map<String, String> ret = new HashMap();
+ Map<String, String> ret = new HashMap<>();
for (Map.Entry<List<Long>, NodeInfo> entry : newExecutorNodePort.entrySet()) {
NodeInfo newNodeInfo = entry.getValue();
NodeInfo oldNodeInfo = oldExecutorNodePort.get(entry.getKey());
@@ -1984,11 +2052,14 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
Cluster cluster = new Cluster(inimbus, supervisors, topoToSchedAssignment, topologies, conf);
cluster.setStatusMap(idToSchedStatus.get());
- long beforeSchedule = System.currentTimeMillis();
+ schedulingStartTimeNs.set(Time.nanoTime());
scheduler.schedule(topologies, cluster);
- long scheduleTimeElapsedMs = System.currentTimeMillis() - beforeSchedule;
- LOG.debug("Scheduling took {} ms for {} topologies", scheduleTimeElapsedMs, topologies.getTopologies().size());
- scheduleTopologyTimeMs.update(scheduleTimeElapsedMs);
+ //Get and set the start time before getting current time in order to avoid potential race with the longest-scheduling-time-ms gauge
+ final Long startTime = schedulingStartTimeNs.getAndSet(null);
+ long elapsed = Time.nanoTime() - startTime;
+ longestSchedulingTime.accumulateAndGet(elapsed, Math::max);
+ schedulingDuration.update(elapsed, TimeUnit.NANOSECONDS);
+ LOG.debug("Scheduling took {} ms for {} topologies", elapsed, topologies.getTopologies().size());
//merge with existing statuses
idToSchedStatus.set(Utils.merge(idToSchedStatus.get(), cluster.getStatusMap()));
@@ -2131,17 +2202,12 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
}
}
// make the new assignments for topologies
- Map<String, SchedulerAssignment> newSchedulerAssignments = null;
synchronized (schedLock) {
- newSchedulerAssignments = computeNewSchedulerAssignments(existingAssignments, topologies, bases, scratchTopoId);
+ Map<String, SchedulerAssignment> newSchedulerAssignments =
+ computeNewSchedulerAssignments(existingAssignments, topologies, bases, scratchTopoId);
Map<String, Map<List<Long>, List<Object>>> topologyToExecutorToNodePort =
- computeNewTopoToExecToNodePort(newSchedulerAssignments, existingAssignments);
- for (String id : assignedTopologyIds) {
- if (!topologyToExecutorToNodePort.containsKey(id)) {
- topologyToExecutorToNodePort.put(id, null);
- }
- }
+ computeTopoToExecToNodePort(newSchedulerAssignments, assignedTopologyIds);
Map<String, Map<WorkerSlot, WorkerResources>> newAssignedWorkerToResources =
computeTopoToNodePortToResources(newSchedulerAssignments);
int nowSecs = Time.currentTimeSecs();
@@ -2154,14 +2220,12 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
if (execToNodePort == null) {
execToNodePort = new HashMap<>();
}
- Assignment existingAssignment = existingAssignments.get(topoId);
Set<String> allNodes = new HashSet<>();
- if (execToNodePort != null) {
- for (List<Object> nodePort : execToNodePort.values()) {
- allNodes.add((String) nodePort.get(0));
- }
+ for (List<Object> nodePort : execToNodePort.values()) {
+ allNodes.add((String) nodePort.get(0));
}
Map<String, String> allNodeHost = new HashMap<>();
+ Assignment existingAssignment = existingAssignments.get(topoId);
if (existingAssignment != null) {
allNodeHost.putAll(existingAssignment.get_node_host());
}
@@ -2219,15 +2283,9 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
newAssignments.put(topoId, newAssignment);
}
- if (!newAssignments.equals(existingAssignments)) {
+ boolean assignmentChanged = auditAssignmentChanges(existingAssignments, newAssignments);
+ if (assignmentChanged) {
LOG.debug("RESETTING id->resources and id->worker-resources cache!");
- LOG.info("Fragmentation after scheduling is: {} MB, {} PCore CPUs", fragmentedMemory(), fragmentedCpu());
- nodeIdToResources.get().forEach((id, node) ->
- LOG.info(
- "Node Id: {} Total Mem: {}, Used Mem: {}, Available Mem: {}, Total CPU: {}, Used "
- + "CPU: {}, Available CPU: {}, fragmented: {}",
- id, node.getTotalMem(), node.getUsedMem(), node.getAvailableMem(),
- node.getTotalCpu(), node.getUsedCpu(), node.getAvailableCpu(), isFragmented(node)));
idToResources.set(new HashMap<>());
idToWorkerResources.set(new HashMap<>());
}
@@ -2826,21 +2884,27 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
.parallelStream()
.mapToDouble(SupervisorResources::getTotalCpu)
.sum());
-
+ StormMetricsRegistry.registerGauge("nimbus:longest-scheduling-time-ms", () -> {
+ //We want to update longest scheduling time in real time in case scheduler get stuck
+ // Get current time before startTime to avoid potential race with scheduler's Timer
+ Long currTime = Time.nanoTime();
+ Long startTime = schedulingStartTimeNs.get();
+ return TimeUnit.NANOSECONDS.toMillis(startTime == null ?
+ longestSchedulingTime.get() : Math.max(currTime - startTime, longestSchedulingTime.get()));
+ });
+ StormMetricsRegistry.registerMeter("nimbus:num-launched").mark();
StormMetricsRegistry.startMetricsReporters(conf);
- if (clusterConsumerExceutors != null) {
- timer.scheduleRecurring(0, ObjectReader.getInt(conf.get(DaemonConfig.STORM_CLUSTER_METRICS_CONSUMER_PUBLISH_INTERVAL_SECS)),
- () -> {
- try {
- if (isLeader()) {
- sendClusterMetricsToExecutors();
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
+ timer.scheduleRecurring(0, ObjectReader.getInt(conf.get(DaemonConfig.STORM_CLUSTER_METRICS_CONSUMER_PUBLISH_INTERVAL_SECS)),
+ () -> {
+ try {
+ if (isLeader()) {
+ sendClusterMetricsToExecutors();
}
- });
- }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
} catch (Exception e) {
if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e)) {
throw e;
@@ -3689,7 +3753,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
beginFileUploadCalls.mark();
checkAuthorization(null, null, "fileUpload");
String fileloc = getInbox() + "/stormjar-" + Utils.uuid() + ".jar";
- uploaders.put(fileloc, Channels.newChannel(new FileOutputStream(fileloc)));
+ uploaders.put(fileloc, new TimedWritableByteChannel(Channels.newChannel(new FileOutputStream(fileloc)), fileUploadDuration));
LOG.info("Uploading file from client to {}", fileloc);
return fileloc;
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/storm/blob/bf81b684/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
index 90d68dc..4619aeb 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
@@ -114,20 +114,17 @@ public class SupervisorUtils {
* @param conf
* @return
*
- * @throws Exception
*/
- public static Map<String, LSWorkerHeartbeat> readWorkerHeartbeats(Map<String, Object> conf) throws Exception {
+ public static Map<String, LSWorkerHeartbeat> readWorkerHeartbeats(Map<String, Object> conf) {
return _instance.readWorkerHeartbeatsImpl(conf);
}
/**
- * get worker heartbeat by workerId
+ * get worker heartbeat by workerId.
*
* @param conf
* @param workerId
* @return
- *
- * @throws IOException
*/
private static LSWorkerHeartbeat readWorkerHeartbeat(Map<String, Object> conf, String workerId) {
return _instance.readWorkerHeartbeatImpl(conf, workerId);
@@ -137,7 +134,7 @@ public class SupervisorUtils {
return _instance.isWorkerHbTimedOutImpl(now, whb, conf);
}
- public Map<String, LSWorkerHeartbeat> readWorkerHeartbeatsImpl(Map<String, Object> conf) throws Exception {
+ public Map<String, LSWorkerHeartbeat> readWorkerHeartbeatsImpl(Map<String, Object> conf) {
Map<String, LSWorkerHeartbeat> workerHeartbeats = new HashMap<>();
Collection<String> workerIds = SupervisorUtils.supervisorWorkerIds(conf);
http://git-wip-us.apache.org/repos/asf/storm/blob/bf81b684/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java b/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java
index 952d8d9..f12713b 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java
@@ -52,8 +52,7 @@ public abstract class LocallyCachedBlob {
private long lastUsed = Time.currentTimeMillis();
private CompletableFuture<Void> doneUpdating = null;
- private static final Histogram fetchingRate = StormMetricsRegistry.registerHistogram(
- "supervisor:blob-fetching-rate-MB/s", new ExponentiallyDecayingReservoir());
+ private static final Histogram fetchingRate = StormMetricsRegistry.registerHistogram("supervisor:blob-fetching-rate-MB/s");
/**
* Create a new LocallyCachedBlob.
http://git-wip-us.apache.org/repos/asf/storm/blob/bf81b684/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java b/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java
index 602f53e..ea8867e 100644
--- a/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java
+++ b/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java
@@ -18,18 +18,21 @@ import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.MetricSet;
import com.codahale.metrics.Reservoir;
import com.codahale.metrics.Timer;
import java.util.Map;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.storm.daemon.metrics.MetricsUtils;
import org.apache.storm.daemon.metrics.reporters.PreparableReporter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class StormMetricsRegistry extends MetricRegistry {
- private static final StormMetricsRegistry REGISTRY = new StormMetricsRegistry();
+ @VisibleForTesting
+ static final StormMetricsRegistry REGISTRY = new StormMetricsRegistry();
private static final Logger LOG = LoggerFactory.getLogger(StormMetricsRegistry.class);
private StormMetricsRegistry() {/*Singleton pattern*/}
@@ -54,6 +57,25 @@ public class StormMetricsRegistry extends MetricRegistry {
REGISTRY.register(name, meter);
}
+ public static void registerMetricSet(MetricSet metrics) {
+ REGISTRY.registerAll(metrics);
+ }
+
+ public static void unregisterMetricSet(MetricSet metrics) {
+ unregisterMetricSet(null, metrics);
+ }
+
+ public static void unregisterMetricSet(String prefix, MetricSet metrics) {
+ for (Map.Entry<String, Metric> entry : metrics.getMetrics().entrySet()) {
+ final String name = name(prefix, entry.getKey());
+ if (entry.getValue() instanceof MetricSet) {
+ unregisterMetricSet(name, (MetricSet) entry.getValue());
+ } else {
+ REGISTRY.remove(name);
+ }
+ }
+ }
+
public static Timer registerTimer(String name) {
return REGISTRY.register(name, new Timer());
}
@@ -84,6 +106,7 @@ public class StormMetricsRegistry extends MetricRegistry {
*/
@Override
public <T extends Metric> T register(final String name, T metric) throws IllegalArgumentException {
+ assert !(metric instanceof MetricSet);
try {
return super.register(name, metric);
} catch (IllegalArgumentException e) {
http://git-wip-us.apache.org/repos/asf/storm/blob/bf81b684/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java b/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java
index e54509e..3783fdb 100644
--- a/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java
+++ b/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java
@@ -19,6 +19,8 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import javax.security.auth.Subject;
+
+import com.codahale.metrics.Meter;
import org.apache.commons.io.IOUtils;
import org.apache.storm.Config;
import org.apache.storm.blobstore.BlobStore;
@@ -29,6 +31,7 @@ import org.apache.storm.daemon.nimbus.TopoCache;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.KeyNotFoundException;
import org.apache.storm.generated.StormTopology;
+import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.security.auth.ReqContext;
import org.apache.storm.shade.com.google.common.base.Joiner;
import org.apache.storm.shade.com.google.common.collect.Sets;
@@ -45,6 +48,8 @@ import org.slf4j.LoggerFactory;
* A callback function when nimbus gains leadership.
*/
public class LeaderListenerCallback {
+ private static final Meter numGainedLeader = StormMetricsRegistry.registerMeter("nimbus:num-gained-leadership");
+ private static final Meter numLostLeader = StormMetricsRegistry.registerMeter("nimbus:num-lost-leadership");
private static final Logger LOG = LoggerFactory.getLogger(LeaderListenerCallback.class);
private static final String STORM_JAR_SUFFIX = "-stormjar.jar";
private static final String STORM_CODE_SUFFIX = "-stormcode.ser";
@@ -82,6 +87,7 @@ public class LeaderListenerCallback {
* Invoke when gains leadership.
*/
public void leaderCallBack() {
+ numGainedLeader.mark();
//set up nimbus-info to zk
setUpNimbusInfo(acls);
//sync zk assignments/id-info to local
@@ -131,6 +137,7 @@ public class LeaderListenerCallback {
* Invoke when lost leadership.
*/
public void notLeaderCallback() {
+ numLostLeader.mark();
tc.clear();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/bf81b684/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
index 3f48669..d014236 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
@@ -145,11 +145,7 @@ public class Cluster implements ISchedulingState {
String nodeId = entry.getKey();
SupervisorDetails supervisor = entry.getValue();
String host = supervisor.getHost();
- List<String> ids = hostToId.get(host);
- if (ids == null) {
- ids = new ArrayList<>();
- hostToId.put(host, ids);
- }
+ List<String> ids = hostToId.computeIfAbsent(host, k -> new ArrayList<>());
ids.add(nodeId);
}
this.conf = conf;
@@ -173,11 +169,7 @@ public class Cluster implements ISchedulingState {
for (Map.Entry<String, String> entry : resolvedSuperVisors.entrySet()) {
String hostName = entry.getKey();
String rack = entry.getValue();
- List<String> nodesForRack = this.networkTopography.get(rack);
- if (nodesForRack == null) {
- nodesForRack = new ArrayList<>();
- this.networkTopography.put(rack, nodesForRack);
- }
+ List<String> nodesForRack = this.networkTopography.computeIfAbsent(rack, k -> new ArrayList<>());
nodesForRack.add(hostName);
}
} else {
http://git-wip-us.apache.org/repos/asf/storm/blob/bf81b684/storm-server/src/main/java/org/apache/storm/scheduler/ExecutorDetails.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/ExecutorDetails.java b/storm-server/src/main/java/org/apache/storm/scheduler/ExecutorDetails.java
index 855cc96..18de717 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/ExecutorDetails.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/ExecutorDetails.java
@@ -18,6 +18,9 @@
package org.apache.storm.scheduler;
+import java.util.Arrays;
+import java.util.List;
+
public class ExecutorDetails {
public final int startTask;
public final int endTask;
@@ -35,9 +38,13 @@ public class ExecutorDetails {
return endTask;
}
+ public List<Long> toList() {
+ return Arrays.asList((long) startTask, (long) endTask);
+ }
+
@Override
public boolean equals(Object other) {
- if (other == null || !(other instanceof ExecutorDetails)) {
+ if (!(other instanceof ExecutorDetails)) {
return false;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/bf81b684/storm-server/src/test/java/org/apache/storm/metric/StormMetricsRegistryTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/metric/StormMetricsRegistryTest.java b/storm-server/src/test/java/org/apache/storm/metric/StormMetricsRegistryTest.java
new file mode 100644
index 0000000..5d9b3e4
--- /dev/null
+++ b/storm-server/src/test/java/org/apache/storm/metric/StormMetricsRegistryTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.metric;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricSet;
+import com.codahale.metrics.Timer;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static com.codahale.metrics.MetricRegistry.name;
+import static org.junit.jupiter.api.Assertions.*;
+
+class StormMetricsRegistryTest {
+ private static final Logger LOG = LoggerFactory.getLogger(StormMetricsRegistryTest.class);
+
+ private static final String OUTER_METER = "outerMeter";
+ private static final String INNER_SET = "innerSet";
+ private static final String OUTER_TIMER = "outerTimer";
+ private static final String INNER_METER = "innerMeter";
+ private static final String INNER_TIMER = "innerTimer";
+ private static final MetricSet OUTER = newMetricSetInstance();
+
+ @Test
+ void registerMetricSet() {
+ Meter existingInnerMeter = StormMetricsRegistry.registerMeter(name(INNER_SET, INNER_METER));
+
+ LOG.info("register outer set");
+ StormMetricsRegistry.registerMetricSet(OUTER);
+ assertSame(OUTER.getMetrics().get(OUTER_TIMER), StormMetricsRegistry.REGISTRY.getMetrics().get(OUTER_TIMER));
+ assertSame(OUTER.getMetrics().get(OUTER_METER), StormMetricsRegistry.REGISTRY.getMetrics().get(OUTER_METER));
+ assertSame(((MetricSet) OUTER.getMetrics().get(INNER_SET)).getMetrics().get(INNER_TIMER),
+ StormMetricsRegistry.REGISTRY.getMetrics().get(name(INNER_SET, INNER_TIMER)));
+
+ assertNotSame(((MetricSet) OUTER.getMetrics().get(INNER_SET)).getMetrics().get(INNER_METER),
+ StormMetricsRegistry.REGISTRY.getMetrics().get(name(INNER_SET, INNER_METER)));
+ assertSame(existingInnerMeter, StormMetricsRegistry.REGISTRY.getMetrics().get(name(INNER_SET, INNER_METER)));
+
+ //Ensure idempotency
+ LOG.info("twice register outer set");
+ MetricSet newOuter = newMetricSetInstance();
+ StormMetricsRegistry.registerMetricSet(newOuter);
+ assertSame(OUTER.getMetrics().get(OUTER_TIMER), StormMetricsRegistry.REGISTRY.getMetrics().get(OUTER_TIMER));
+ assertSame(OUTER.getMetrics().get(OUTER_METER), StormMetricsRegistry.REGISTRY.getMetrics().get(OUTER_METER));
+ assertSame(((MetricSet) OUTER.getMetrics().get(INNER_SET)).getMetrics().get(INNER_TIMER),
+ StormMetricsRegistry.REGISTRY.getMetrics().get(name(INNER_SET, INNER_TIMER)));
+ assertSame(existingInnerMeter, StormMetricsRegistry.REGISTRY.getMetrics().get(name(INNER_SET, INNER_METER)));
+
+ LOG.info("name collision");
+ assertThrows(IllegalArgumentException.class, () -> StormMetricsRegistry.registerGauge(name(INNER_SET, INNER_METER), () -> 0));
+ }
+
+ @Test
+ void unregisterMetricSet() {
+ StormMetricsRegistry.registerMetricSet(OUTER);
+ StormMetricsRegistry.unregisterMetricSet(OUTER);
+ assertTrue(StormMetricsRegistry.REGISTRY.getMetrics().isEmpty());
+
+ }
+
+ private static MetricSet newMetricSetInstance() {
+ return new MetricSet() {
+ private final MetricSet inner = new MetricSet() {
+ private final Map<String, Metric> map = new HashMap<>();
+
+ {
+ map.put(INNER_METER, new Meter());
+ map.put(INNER_TIMER, new Timer());
+ }
+
+ @Override
+ public Map<String, Metric> getMetrics() {
+ return map;
+ }
+ };
+ private final Map<String, Metric> outerMap = new HashMap<>();
+
+ {
+ outerMap.put(OUTER_METER, new Meter());
+ outerMap.put(INNER_SET, inner);
+ outerMap.put(OUTER_TIMER, new Timer());
+ }
+
+ @Override
+ public Map<String, Metric> getMetrics() {
+ return outerMap;
+ }
+ };
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/bf81b684/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java
----------------------------------------------------------------------
diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java
index 07ac14b..07b971c 100644
--- a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java
+++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java
@@ -21,6 +21,8 @@ package org.apache.storm.daemon.logviewer;
import static org.apache.storm.DaemonConfig.UI_HEADER_BUFFER_BYTES;
import com.codahale.metrics.Meter;
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricSet;
import com.google.common.annotations.VisibleForTesting;
import java.io.File;
@@ -31,6 +33,7 @@ import java.util.Map;
import org.apache.storm.DaemonConfig;
import org.apache.storm.daemon.logviewer.utils.DirectoryCleaner;
+import org.apache.storm.daemon.logviewer.utils.ExceptionMeters;
import org.apache.storm.daemon.logviewer.utils.LogCleaner;
import org.apache.storm.daemon.logviewer.utils.WorkerLogs;
import org.apache.storm.daemon.logviewer.webapp.LogviewerApplication;
@@ -126,6 +129,7 @@ public class LogviewerServer implements AutoCloseable {
void start() throws Exception {
LOG.info("Starting Logviewer...");
if (httpServer != null) {
+ StormMetricsRegistry.registerMetricSet(ExceptionMeters::getMetrics);
httpServer.start();
}
}
@@ -165,7 +169,7 @@ public class LogviewerServer implements AutoCloseable {
try (LogviewerServer server = new LogviewerServer(conf);
LogCleaner logCleaner = new LogCleaner(conf, workerLogs, directoryCleaner, logRootDir)) {
- Utils.addShutdownHookWithForceKillIn1Sec(() -> server.close());
+ Utils.addShutdownHookWithForceKillIn1Sec(server::close);
logCleaner.start();
StormMetricsRegistry.startMetricsReporters(conf);
server.start();
http://git-wip-us.apache.org/repos/asf/storm/blob/bf81b684/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandler.java
----------------------------------------------------------------------
diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandler.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandler.java
index 32e79eb..089d965 100644
--- a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandler.java
+++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandler.java
@@ -37,11 +37,13 @@ import static java.util.stream.Collectors.toCollection;
import static java.util.stream.Collectors.toList;
import static org.apache.commons.lang.StringEscapeUtils.escapeHtml;
+import com.codahale.metrics.Meter;
import j2html.tags.DomContent;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
@@ -61,6 +63,7 @@ import javax.ws.rs.core.Response;
import org.apache.commons.lang.StringUtils;
import org.apache.storm.daemon.logviewer.LogviewerConstant;
import org.apache.storm.daemon.logviewer.utils.DirectoryCleaner;
+import org.apache.storm.daemon.logviewer.utils.ExceptionMeters;
import org.apache.storm.daemon.logviewer.utils.LogviewerResponseBuilder;
import org.apache.storm.daemon.logviewer.utils.ResourceAuthorizer;
import org.apache.storm.daemon.logviewer.utils.WorkerLogs;
@@ -68,11 +71,13 @@ import org.apache.storm.daemon.ui.InvalidRequestException;
import org.apache.storm.daemon.ui.UIHelpers;
import org.apache.storm.daemon.utils.StreamUtil;
import org.apache.storm.daemon.utils.UrlBuilder;
+import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.ServerUtils;
import org.jooq.lambda.Unchecked;
public class LogviewerLogPageHandler {
+ private static final Meter numPageRead = StormMetricsRegistry.registerMeter("logviewer:num-page-read");
private final String logRoot;
private final String daemonLogRoot;
private final WorkerLogs workerLogs;
@@ -152,7 +157,7 @@ public class LogviewerLogPageHandler {
List<String> files;
if (fileResults != null) {
files = fileResults.stream()
- .map(file -> WorkerLogs.getTopologyPortWorkerLog(file))
+ .map(WorkerLogs::getTopologyPortWorkerLog)
.sorted().collect(toList());
} else {
files = new ArrayList<>();
@@ -162,11 +167,12 @@ public class LogviewerLogPageHandler {
}
/**
- * Provides a worker log file to view.
+ * Provides a worker log file to view, starting from the specified position
+ * or default starting position of the most recent page.
*
* @param fileName file to view
- * @param start start offset, can be null
- * @param length length to read in this page, can be null
+ * @param start start offset, or null if the most recent page is desired
+ * @param length length to read in this page, or null if default page length is desired
* @param grep search string if request is a result of the search, can be null
* @param user username
* @return HTML view page of worker log
@@ -179,7 +185,6 @@ public class LogviewerLogPageHandler {
File file = new File(rootDir, fileName).getCanonicalFile();
String path = file.getCanonicalPath();
- boolean isZipFile = path.endsWith(".gz");
File topoDir = file.getParentFile().getParentFile();
if (file.exists() && new File(rootDir).getCanonicalFile().equals(topoDir.getParentFile())) {
@@ -193,24 +198,21 @@ public class LogviewerLogPageHandler {
throw e.getCause();
}
- List<String> filesStrWithoutFileParam = logFiles.stream().map(WorkerLogs::getTopologyPortWorkerLog)
- .filter(fileStr -> !StringUtils.equals(fileName, fileStr)).collect(toList());
-
- List<String> reorderedFilesStr = new ArrayList<>();
- reorderedFilesStr.addAll(filesStrWithoutFileParam);
+ List<String> reorderedFilesStr = logFiles.stream()
+ .map(WorkerLogs::getTopologyPortWorkerLog)
+ .filter(fileStr -> !StringUtils.equals(fileName, fileStr))
+ .collect(toList());
reorderedFilesStr.add(fileName);
length = length != null ? Math.min(10485760, length) : LogviewerConstant.DEFAULT_BYTES_PER_PAGE;
-
- String logString;
- if (isTxtFile(fileName)) {
- logString = escapeHtml(start != null ? pageFile(path, start, length) : pageFile(path, length));
- } else {
- logString = escapeHtml("This is a binary file and cannot display! You may download the full file.");
+ final boolean isZipFile = path.endsWith(".gz");
+ long fileLength = getFileLength(file, isZipFile);
+ if (start == null) {
+ start = Long.valueOf(fileLength - length).intValue();
}
- long fileLength = isZipFile ? ServerUtils.zipFileSize(file) : file.length();
- start = start != null ? start : Long.valueOf(fileLength - length).intValue();
+ String logString = isTxtFile(fileName) ? escapeHtml(pageFile(path, isZipFile, fileLength, start, length)) :
+ escapeHtml("This is a binary file and cannot display! You may download the full file.");
List<DomContent> bodyContents = new ArrayList<>();
if (StringUtils.isNotEmpty(grep)) {
@@ -254,8 +256,8 @@ public class LogviewerLogPageHandler {
* Provides a daemon log file to view.
*
* @param fileName file to view
- * @param start start offset, can be null
- * @param length length to read in this page, can be null
+ * @param start start offset, or null if the most recent page is desired
+ * @param length length to read in this page, or null if default page length is desired
* @param grep search string if request is a result of the search, can be null
* @param user username
* @return HTML view page of daemon log
@@ -265,7 +267,6 @@ public class LogviewerLogPageHandler {
String rootDir = daemonLogRoot;
File file = new File(rootDir, fileName).getCanonicalFile();
String path = file.getCanonicalPath();
- boolean isZipFile = path.endsWith(".gz");
if (file.exists() && new File(rootDir).getCanonicalFile().equals(file.getParentFile())) {
// all types of files included
@@ -273,24 +274,21 @@ public class LogviewerLogPageHandler {
.filter(File::isFile)
.collect(toList());
- List<String> filesStrWithoutFileParam = logFiles.stream()
- .map(File::getName).filter(fName -> !StringUtils.equals(fileName, fName)).collect(toList());
-
- List<String> reorderedFilesStr = new ArrayList<>();
- reorderedFilesStr.addAll(filesStrWithoutFileParam);
+ List<String> reorderedFilesStr = logFiles.stream()
+ .map(File::getName)
+ .filter(fName -> !StringUtils.equals(fileName, fName))
+ .collect(toList());
reorderedFilesStr.add(fileName);
length = length != null ? Math.min(10485760, length) : LogviewerConstant.DEFAULT_BYTES_PER_PAGE;
-
- String logString;
- if (isTxtFile(fileName)) {
- logString = escapeHtml(start != null ? pageFile(path, start, length) : pageFile(path, length));
- } else {
- logString = escapeHtml("This is a binary file and cannot display! You may download the full file.");
+ final boolean isZipFile = path.endsWith(".gz");
+ long fileLength = getFileLength(file, isZipFile);
+ if (start == null) {
+ start = Long.valueOf(fileLength - length).intValue();
}
- long fileLength = isZipFile ? ServerUtils.zipFileSize(file) : file.length();
- start = start != null ? start : Long.valueOf(fileLength - length).intValue();
+ String logString = isTxtFile(fileName) ? escapeHtml(pageFile(path, isZipFile, fileLength, start, length)) :
+ escapeHtml("This is a binary file and cannot display! You may download the full file.");
List<DomContent> bodyContents = new ArrayList<>();
if (StringUtils.isNotEmpty(grep)) {
@@ -323,6 +321,18 @@ public class LogviewerLogPageHandler {
}
}
+ private long getFileLength(File file, boolean isZipFile) throws IOException {
+ try {
+ return isZipFile ? ServerUtils.zipFileSize(file) : file.length();
+ } catch (FileNotFoundException e) {
+ ExceptionMeters.NUM_FILE_OPEN_EXCEPTIONS.mark();
+ throw e;
+ } catch (IOException e) {
+ ExceptionMeters.NUM_FILE_READ_EXCEPTIONS.mark();
+ throw e;
+ }
+ }
+
private DomContent logTemplate(List<DomContent> bodyContents, String fileName, String user) {
List<DomContent> finalBodyContents = new ArrayList<>();
@@ -426,17 +436,8 @@ public class LogviewerLogPageHandler {
return a(text).withHref(url).withClass("btn btn-default " + (enabled ? "enabled" : "disabled"));
}
- private String pageFile(String path, Integer tail) throws IOException, InvalidRequestException {
- boolean isZipFile = path.endsWith(".gz");
- long fileLength = isZipFile ? ServerUtils.zipFileSize(new File(path)) : new File(path).length();
- long skip = fileLength - tail;
- return pageFile(path, Long.valueOf(skip).intValue(), tail);
- }
-
- private String pageFile(String path, Integer start, Integer length) throws IOException, InvalidRequestException {
- boolean isZipFile = path.endsWith(".gz");
- long fileLength = isZipFile ? ServerUtils.zipFileSize(new File(path)) : new File(path).length();
-
+ private String pageFile(String path, boolean isZipFile, long fileLength, Integer start, Integer readLength)
+ throws IOException, InvalidRequestException {
try (InputStream input = isZipFile ? new GZIPInputStream(new FileInputStream(path)) : new FileInputStream(path);
ByteArrayOutputStream output = new ByteArrayOutputStream()) {
if (start >= fileLength) {
@@ -447,8 +448,8 @@ public class LogviewerLogPageHandler {
}
byte[] buffer = new byte[1024];
- while (output.size() < length) {
- int size = input.read(buffer, 0, Math.min(1024, length - output.size()));
+ while (output.size() < readLength) {
+ int size = input.read(buffer, 0, Math.min(1024, readLength - output.size()));
if (size > 0) {
output.write(buffer, 0, size);
} else {
@@ -456,7 +457,14 @@ public class LogviewerLogPageHandler {
}
}
+ numPageRead.mark();
return output.toString();
+ } catch (FileNotFoundException e) {
+ ExceptionMeters.NUM_FILE_OPEN_EXCEPTIONS.mark();
+ throw e;
+ } catch (IOException e) {
+ ExceptionMeters.NUM_FILE_READ_EXCEPTIONS.mark();
+ throw e;
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/bf81b684/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogSearchHandler.java
----------------------------------------------------------------------
diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogSearchHandler.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogSearchHandler.java
index a26396c..bcde077 100644
--- a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogSearchHandler.java
+++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogSearchHandler.java
@@ -25,6 +25,10 @@ import static org.apache.storm.daemon.utils.ListFunctionalSupport.last;
import static org.apache.storm.daemon.utils.ListFunctionalSupport.rest;
import static org.apache.storm.daemon.utils.PathUtil.truncatePathToLastElements;
+import com.codahale.metrics.ExponentiallyDecayingReservoir;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
@@ -46,6 +50,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
+import java.util.stream.Stream;
import java.util.zip.GZIPInputStream;
import javax.ws.rs.core.Response;
@@ -56,12 +61,14 @@ import org.apache.storm.DaemonConfig;
import org.apache.storm.daemon.common.JsonResponseBuilder;
import org.apache.storm.daemon.logviewer.LogviewerConstant;
import org.apache.storm.daemon.logviewer.utils.DirectoryCleaner;
+import org.apache.storm.daemon.logviewer.utils.ExceptionMeters;
import org.apache.storm.daemon.logviewer.utils.LogviewerResponseBuilder;
import org.apache.storm.daemon.logviewer.utils.ResourceAuthorizer;
import org.apache.storm.daemon.logviewer.utils.WorkerLogs;
import org.apache.storm.daemon.ui.InvalidRequestException;
import org.apache.storm.daemon.utils.StreamUtil;
import org.apache.storm.daemon.utils.UrlBuilder;
+import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.ServerUtils;
import org.apache.storm.utils.Utils;
@@ -71,6 +78,9 @@ import org.slf4j.LoggerFactory;
public class LogviewerLogSearchHandler {
private static final Logger LOG = LoggerFactory.getLogger(LogviewerLogSearchHandler.class);
+ private static final Meter numDeepSearchNoResult = StormMetricsRegistry.registerMeter("logviewer:num-deep-search-no-result");
+ private static final Histogram numFileScanned = StormMetricsRegistry.registerHistogram("logviewer:num-files-scanned-per-deep-search");
+ private static final Meter numSearchRequestNoResult = StormMetricsRegistry.registerMeter("logviewer:num-search-request-no-result");
public static final int GREP_MAX_SEARCH_SIZE = 1024;
public static final int GREP_BUF_SIZE = 2048;
@@ -124,6 +134,8 @@ public class LogviewerLogSearchHandler {
public Response searchLogFile(String fileName, String user, boolean isDaemon, String search,
String numMatchesStr, String offsetStr, String callback, String origin)
throws IOException, InvalidRequestException {
+ boolean noResult = true;
+
String rootDir = isDaemon ? daemonLogRoot : logRoot;
File file = new File(rootDir, fileName).getCanonicalFile();
Response response;
@@ -136,7 +148,9 @@ public class LogviewerLogSearchHandler {
if (StringUtils.isNotEmpty(search) && search.getBytes("UTF-8").length <= GREP_MAX_SEARCH_SIZE) {
Map<String, Object> entity = new HashMap<>();
entity.put("isDaemon", isDaemon ? "yes" : "no");
- entity.putAll(substringSearch(file, search, isDaemon, numMatchesInt, offsetInt));
+ Map<String, Object> res = substringSearch(file, search, isDaemon, numMatchesInt, offsetInt);
+ entity.putAll(res);
+ noResult = ((List) res.get("matches")).isEmpty();
response = LogviewerResponseBuilder.buildSuccessJsonResponse(entity, callback, origin);
} else {
@@ -159,16 +173,20 @@ public class LogviewerLogSearchHandler {
response = new JsonResponseBuilder().setData(entity).setCallback(callback).setStatus(404).build();
}
+ if (noResult) {
+ numSearchRequestNoResult.mark();
+ }
return response;
}
/**
- * Deep search across worker log files in a topology.
+ * Advanced search across worker log files in a topology.
*
* @param topologyId topology ID
* @param user username
* @param search search string
- * @param numMatchesStr the count of maximum matches
+ * @param numMatchesStr the count of maximum matches. Note that this number is with respect to
+ * each port, not to each log or each search request
* @param portStr worker port, null or '*' if the request wants to search from all worker logs
* @param fileOffsetStr index (offset) of the log files
* @param offsetStr start offset for log file
@@ -180,6 +198,9 @@ public class LogviewerLogSearchHandler {
public Response deepSearchLogsForTopology(String topologyId, String user, String search,
String numMatchesStr, String portStr, String fileOffsetStr, String offsetStr,
Boolean searchArchived, String callback, String origin) {
+ int numMatchedFiles = 0;
+ int numScannedFiles = 0;
+
String rootDir = logRoot;
Object returnValue;
File topologyDir = new File(rootDir, topologyId);
@@ -200,24 +221,24 @@ public class LogviewerLogSearchHandler {
if (StringUtils.isEmpty(portStr) || portStr.equals("*")) {
// check for all ports
- List<List<File>> filteredLogs = portDirs.stream()
- .map(portDir -> logsForPort(user, portDir))
- .filter(logs -> logs != null && !logs.isEmpty())
- .collect(toList());
+ Stream<List<File>> portsOfLogs = portDirs.stream()
+ .map(portDir -> logsForPort(user, portDir))
+ .filter(logs -> logs != null && !logs.isEmpty());
- if (BooleanUtils.isTrue(searchArchived)) {
- returnValue = filteredLogs.stream()
- .map(fl -> findNMatches(fl, numMatches, 0, 0, search))
- .collect(toList());
- } else {
- returnValue = filteredLogs.stream()
- .map(fl -> Collections.singletonList(first(fl)))
- .map(fl -> findNMatches(fl, numMatches, 0, 0, search))
- .collect(toList());
+ if (BooleanUtils.isNotTrue(searchArchived)) {
+ portsOfLogs = portsOfLogs.map(fl -> Collections.singletonList(first(fl)));
}
+
+ final List<Matched> matchedList = portsOfLogs
+ .map(logs -> findNMatches(logs, numMatches, 0, 0, search))
+ .collect(toList());
+ numMatchedFiles = matchedList.stream().mapToInt(match -> match.getMatches().size()).sum();
+ numScannedFiles = matchedList.stream().mapToInt(match -> match.openedFiles).sum();
+ returnValue = matchedList;
} else {
int port = Integer.parseInt(portStr);
// check just the one port
+ @SuppressWarnings("unchecked")
List<Integer> slotsPorts = (List<Integer>) stormConf.getOrDefault(DaemonConfig.SUPERVISOR_SLOTS_PORTS,
new ArrayList<>());
boolean containsPort = slotsPorts.stream()
@@ -232,17 +253,22 @@ public class LogviewerLogSearchHandler {
returnValue = new ArrayList<>();
} else {
List<File> filteredLogs = logsForPort(user, portDir);
- if (BooleanUtils.isTrue(searchArchived)) {
- returnValue = findNMatches(filteredLogs, numMatches, fileOffset, offset, search);
- } else {
- returnValue = findNMatches(Collections.singletonList(first(filteredLogs)),
- numMatches, 0, offset, search);
+ if (BooleanUtils.isNotTrue(searchArchived)) {
+ filteredLogs = Collections.singletonList(first(filteredLogs));
+ fileOffset = 0;
}
+ returnValue = findNMatches(filteredLogs, numMatches, fileOffset, offset, search);
+ numMatchedFiles = ((Matched) returnValue).getMatches().size();
+ numScannedFiles = ((Matched) returnValue).openedFiles;
}
}
}
}
+ if (numMatchedFiles == 0) {
+ numDeepSearchNoResult.mark();
+ }
+ numFileScanned.update(numScannedFiles);
return LogviewerResponseBuilder.buildSuccessJsonResponse(returnValue, callback, origin);
}
@@ -271,26 +297,21 @@ public class LogviewerLogSearchHandler {
private Map<String,Object> substringSearch(File file, String searchString, boolean isDaemon, Integer numMatches,
Integer startByteOffset) throws InvalidRequestException {
- try {
- if (StringUtils.isEmpty(searchString)) {
- throw new IllegalArgumentException("Precondition fails: search string should not be empty.");
- }
- if (searchString.getBytes(StandardCharsets.UTF_8).length > GREP_MAX_SEARCH_SIZE) {
- throw new IllegalArgumentException("Precondition fails: the length of search string should be less than "
- + GREP_MAX_SEARCH_SIZE);
- }
+ if (StringUtils.isEmpty(searchString)) {
+ throw new IllegalArgumentException("Precondition fails: search string should not be empty.");
+ }
+ if (searchString.getBytes(StandardCharsets.UTF_8).length > GREP_MAX_SEARCH_SIZE) {
+ throw new IllegalArgumentException("Precondition fails: the length of search string should be less than "
+ + GREP_MAX_SEARCH_SIZE);
+ }
- boolean isZipFile = file.getName().endsWith(".gz");
- try (InputStream fis = Files.newInputStream(file.toPath());
- InputStream gzippedInputStream = isZipFile ? new GZIPInputStream(fis) : fis;
- BufferedInputStream stream = new BufferedInputStream(gzippedInputStream)) {
+ boolean isZipFile = file.getName().endsWith(".gz");
+ try (InputStream fis = Files.newInputStream(file.toPath())) {
+ try (InputStream gzippedInputStream = isZipFile ? new GZIPInputStream(fis) : fis;
+ BufferedInputStream stream = new BufferedInputStream(gzippedInputStream)) {
- int fileLength;
- if (isZipFile) {
- fileLength = (int) ServerUtils.zipFileSize(file);
- } else {
- fileLength = (int) file.length();
- }
+ //It's more likely to be a file read exception here, so we don't differentiate
+ int fileLength = isZipFile ? (int) ServerUtils.zipFileSize(file) : (int) file.length();
ByteBuffer buf = ByteBuffer.allocate(GREP_BUF_SIZE);
final byte[] bufArray = buf.array();
@@ -311,7 +332,7 @@ public class LogviewerLogSearchHandler {
Arrays.fill(bufArray, (byte) 0);
int totalBytesRead = 0;
- int bytesRead = stream.read(bufArray, 0, Math.min((int) fileLength, GREP_BUF_SIZE));
+ int bytesRead = stream.read(bufArray, 0, Math.min(fileLength, GREP_BUF_SIZE));
buf.limit(bytesRead);
totalBytesRead += bytesRead;
@@ -335,7 +356,7 @@ public class LogviewerLogSearchHandler {
// buffer on the previous read.
final int newBufOffset = Math.min(buf.limit(), GREP_MAX_SEARCH_SIZE) - searchBytes.length;
- totalBytesRead = rotateGrepBuffer(buf, stream, totalBytesRead, file, fileLength);
+ totalBytesRead = rotateGrepBuffer(buf, stream, totalBytesRead, fileLength);
if (totalBytesRead < 0) {
throw new InvalidRequestException("Cannot search past the end of the file");
}
@@ -358,8 +379,14 @@ public class LogviewerLogSearchHandler {
}
}
return ret;
+ } catch (UnknownHostException | UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
+ } catch (IOException e) {
+ ExceptionMeters.NUM_FILE_READ_EXCEPTIONS.mark();
+ throw new RuntimeException(e);
}
} catch (IOException e) {
+ ExceptionMeters.NUM_FILE_OPEN_EXCEPTIONS.mark();
throw new RuntimeException(e);
}
}
@@ -388,32 +415,46 @@ public class LogviewerLogSearchHandler {
}
}
+ /**
+ * Find the first N matches of target string in files.
+ * @param logs all candidate log files to search
+ * @param numMatches number of matches expected
+ * @param fileOffset number of log files to skip initially
+ * @param startByteOffset number of byte to be ignored in each log file
+ * @param targetStr searched string
+ * @return all matched results
+ */
@VisibleForTesting
- Matched findNMatches(List<File> logs, int numMatches, int fileOffset, int offset, String search) {
+ Matched findNMatches(List<File> logs, int numMatches, int fileOffset, int startByteOffset, String targetStr) {
logs = drop(logs, fileOffset);
+ LOG.debug("{} files to scan", logs.size());
List<Map<String, Object>> matches = new ArrayList<>();
int matchCount = 0;
+ int scannedFiles = 0;
while (true) {
if (logs.isEmpty()) {
+ //fileOffset = one past last scanned file
break;
}
File firstLog = logs.get(0);
- Map<String, Object> theseMatches;
+ Map<String, Object> matchInLog;
try {
LOG.debug("Looking through {}", firstLog);
- theseMatches = substringSearch(firstLog, search, numMatches - matchCount, offset);
+ matchInLog = substringSearch(firstLog, targetStr, numMatches - matchCount, startByteOffset);
+ scannedFiles++;
} catch (InvalidRequestException e) {
LOG.error("Can't search past end of file.", e);
- theseMatches = new HashMap<>();
+ matchInLog = new HashMap<>();
}
String fileName = WorkerLogs.getTopologyPortWorkerLog(firstLog);
+ //This section simply put the formatted log filename and corresponding port in the matching.
final List<Map<String, Object>> newMatches = new ArrayList<>(matches);
- Map<String, Object> currentFileMatch = new HashMap<>(theseMatches);
+ Map<String, Object> currentFileMatch = new HashMap<>(matchInLog);
currentFileMatch.put("fileName", fileName);
Path firstLogAbsPath;
try {
@@ -424,27 +465,27 @@ public class LogviewerLogSearchHandler {
currentFileMatch.put("port", truncatePathToLastElements(firstLogAbsPath, 2).getName(0).toString());
newMatches.add(currentFileMatch);
- int newCount = matchCount + ((List<?>)theseMatches.get("matches")).size();
-
- //theseMatches is never empty! As guaranteed by the #get().size() method above
+ int newCount = matchCount + ((List<?>)matchInLog.get("matches")).size();
if (newCount == matchCount) {
// matches and matchCount is not changed
logs = rest(logs);
- offset = 0;
+ startByteOffset = 0;
fileOffset = fileOffset + 1;
} else if (newCount >= numMatches) {
matches = newMatches;
+ //fileOffset = the index of last scanned file
break;
} else {
matches = newMatches;
logs = rest(logs);
- offset = 0;
+ startByteOffset = 0;
fileOffset = fileOffset + 1;
matchCount = newCount;
}
}
- return new Matched(fileOffset, search, matches);
+ LOG.debug("scanned {} files", scannedFiles);
+ return new Matched(fileOffset, targetStr, matches, scannedFiles);
}
@@ -502,8 +543,7 @@ public class LogviewerLogSearchHandler {
return new SubstringSearchResult(matches, newByteOffset, newBeforeBytes);
}
- private int rotateGrepBuffer(ByteBuffer buf, BufferedInputStream stream, int totalBytesRead, File file,
- int fileLength) throws IOException {
+ private int rotateGrepBuffer(ByteBuffer buf, BufferedInputStream stream, int totalBytesRead, int fileLength) throws IOException {
byte[] bufArray = buf.array();
// Copy the 2nd half of the buffer to the first half.
@@ -513,7 +553,7 @@ public class LogviewerLogSearchHandler {
Arrays.fill(bufArray, GREP_MAX_SEARCH_SIZE, bufArray.length, (byte) 0);
// Fill the 2nd half with new bytes from the stream.
- int bytesRead = stream.read(bufArray, GREP_MAX_SEARCH_SIZE, Math.min((int) fileLength, GREP_MAX_SEARCH_SIZE));
+ int bytesRead = stream.read(bufArray, GREP_MAX_SEARCH_SIZE, Math.min(fileLength, GREP_MAX_SEARCH_SIZE));
buf.limit(GREP_MAX_SEARCH_SIZE + bytesRead);
return totalBytesRead + bytesRead;
}
@@ -693,18 +733,21 @@ public class LogviewerLogSearchHandler {
private int fileOffset;
private String searchString;
private List<Map<String, Object>> matches;
+ @JsonIgnore
+ private final int openedFiles;
/**
* Constructor.
- *
- * @param fileOffset offset (index) of the files
+ * @param fileOffset offset (index) of the files
* @param searchString search string
* @param matches map representing matched search result
+ * @param openedFiles number of files scanned, used for metrics only
*/
- public Matched(int fileOffset, String searchString, List<Map<String, Object>> matches) {
+ public Matched(int fileOffset, String searchString, List<Map<String, Object>> matches, int openedFiles) {
this.fileOffset = fileOffset;
this.searchString = searchString;
this.matches = matches;
+ this.openedFiles = openedFiles;
}
public int getFileOffset() {
http://git-wip-us.apache.org/repos/asf/storm/blob/bf81b684/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DeletionMeta.java
----------------------------------------------------------------------
diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DeletionMeta.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DeletionMeta.java
new file mode 100644
index 0000000..9e0afd9
--- /dev/null
+++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DeletionMeta.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.logviewer.utils;
+
+class DeletionMeta {
+ static final DeletionMeta EMPTY = new DeletionMeta(0, 0);
+
+ final long deletedSize;
+ final int deletedFiles;
+
+ DeletionMeta(long deletedSize, int deletedFiles) {
+ this.deletedSize = deletedSize;
+ this.deletedFiles = deletedFiles;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/bf81b684/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DirectoryCleaner.java
----------------------------------------------------------------------
diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DirectoryCleaner.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DirectoryCleaner.java
index 310bc8e..293b2be 100644
--- a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DirectoryCleaner.java
+++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DirectoryCleaner.java
@@ -60,7 +60,12 @@ public class DirectoryCleaner {
* @return DirectoryStream
*/
public DirectoryStream<Path> getStreamForDirectory(File dir) throws IOException {
- return Files.newDirectoryStream(dir.toPath());
+ try {
+ return Files.newDirectoryStream(dir.toPath());
+ } catch (IOException e) {
+ ExceptionMeters.NUM_FILE_OPEN_EXCEPTIONS.mark();
+ throw e;
+ }
}
/**
@@ -74,11 +79,9 @@ public class DirectoryCleaner {
* @param activeDirs only for global deletion, we want to skip the active logs in activeDirs
* @return number of files deleted
*/
- public int deleteOldestWhileTooLarge(List<File> dirs,
- long quota, boolean forPerDir, Set<String> activeDirs) throws IOException {
+ public DeletionMeta deleteOldestWhileTooLarge(List<File> dirs,
+ long quota, boolean forPerDir, Set<String> activeDirs) throws IOException {
long totalSize = 0;
- int deletedFiles = 0;
-
for (File dir : dirs) {
try (DirectoryStream<Path> stream = getStreamForDirectory(dir)) {
for (Path path : stream) {
@@ -87,13 +90,14 @@ public class DirectoryCleaner {
}
}
}
-
LOG.debug("totalSize: {} quota: {}", totalSize, quota);
long toDeleteSize = totalSize - quota;
if (toDeleteSize <= 0) {
- return deletedFiles;
+ return DeletionMeta.EMPTY;
}
+ int deletedFiles = 0;
+ long deletedSize = 0;
// the oldest pq_size files in this directory will be placed in PQ, with the newest at the root
PriorityQueue<File> pq = new PriorityQueue<>(PQ_SIZE, (f1, f2) -> f1.lastModified() > f2.lastModified() ? -1 : 1);
int round = 0;
@@ -134,6 +138,7 @@ public class DirectoryCleaner {
Utils.forceDelete(file.getPath());
LOG.info("Delete file: {}, size: {}, lastModified: {}", canonicalPath, fileSize, lastModified);
toDeleteSize -= fileSize;
+ deletedSize += fileSize;
deletedFiles++;
} catch (IOException e) {
excluded.add(file);
@@ -157,7 +162,7 @@ public class DirectoryCleaner {
forPerDir ? "this directory" : "root directory", toDeleteSize * 1e-6);
}
}
- return deletedFiles;
+ return new DeletionMeta(deletedSize, deletedFiles);
}
private boolean isFileEligibleToSkipDelete(boolean forPerDir, Set<String> activeDirs, File dir, File file) throws IOException {
@@ -186,7 +191,11 @@ public class DirectoryCleaner {
break;
}
}
+ } catch (IOException e) {
+ ExceptionMeters.NUM_FILE_OPEN_EXCEPTIONS.mark();
+ throw e;
}
return files;
}
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/bf81b684/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/ExceptionMeters.java
----------------------------------------------------------------------
diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/ExceptionMeters.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/ExceptionMeters.java
new file mode 100644
index 0000000..81aa222
--- /dev/null
+++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/ExceptionMeters.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
+ * See the NOTICE file distributed with this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.storm.daemon.logviewer.utils;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricSet;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public enum ExceptionMeters {
+ //Operation level IO Exceptions
+ NUM_FILE_OPEN_EXCEPTIONS("logviewer:num-file-open-exceptions"),
+ NUM_FILE_READ_EXCEPTIONS("logviewer:num-file-read-exceptions"),
+ NUM_FILE_REMOVAL_EXCEPTIONS("logviewer:num-file-removal-exceptions"),
+ NUM_FILE_DOWNLOAD_EXCEPTIONS("logviewer:num-file-download-exceptions"),
+ NUM_SET_PERMISSION_EXCEPTIONS("logviewer:num-set-permission-exceptions"),
+
+ //Routine level
+ NUM_CLEANUP_EXCEPTIONS("logviewer:num-other-cleanup-exceptions"),
+ NUM_READ_LOG_EXCEPTIONS("logviewer:num-read-log-exceptions"),
+ NUM_READ_DAEMON_LOG_EXCEPTIONS("logviewer:num-read-daemon-log-exceptions"),
+ NUM_LIST_LOG_EXCEPTIONS("logviewer:num-search-log-exceptions"),
+ NUM_LIST_DUMP_EXCEPTIONS("logviewer:num-list-dump-files-exceptions"),
+ NUM_DOWNLOAD_DUMP_EXCEPTIONS("logviewer:num-download-dump-exceptions"),
+ NUM_DOWNLOAD_LOG_EXCEPTIONS("logviewer:num-download-log-exceptions"),
+ NUM_DOWNLOAD_DAEMON_LOG_EXCEPTIONS("logviewer:num-download-daemon-log-exceptions"),
+ NUM_SEARCH_EXCEPTIONS("logviewer:num-search-exceptions");
+
+ private static final Map<String, Metric> metrics = new HashMap<>();
+
+ static {
+ for (ExceptionMeters e : ExceptionMeters.values()) {
+ metrics.put(e.name, e.meter);
+ }
+ }
+
+ private final String name;
+ private final Meter meter;
+
+ public static Map<String, Metric> getMetrics() {
+ return metrics;
+ }
+
+ ExceptionMeters(String name) {
+ this.name = name;
+ meter = new Meter();
+ }
+
+ public void mark() {
+ this.meter.mark();
+ }
+}