You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2018/05/23 17:12:52 UTC
[04/18] hadoop git commit: YARN-8273. Log aggregation does not warn
if HDFS quota in target directory is exceeded (grepas via rkanter)
YARN-8273. Log aggregation does not warn if HDFS quota in target directory is exceeded (grepas via rkanter)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b22f56c4
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b22f56c4
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b22f56c4
Branch: refs/heads/HDDS-48
Commit: b22f56c4719e63bd4f6edc2a075e0bcdb9442255
Parents: 83f53e5
Author: Robert Kanter <rk...@apache.org>
Authored: Tue May 22 14:24:38 2018 -0700
Committer: Robert Kanter <rk...@apache.org>
Committed: Tue May 22 14:24:38 2018 -0700
----------------------------------------------------------------------
.../hadoop-yarn/hadoop-yarn-common/pom.xml | 4 ++
.../logaggregation/AggregatedLogFormat.java | 14 +++-
.../LogAggregationDFSException.java | 45 ++++++++++++
.../LogAggregationFileController.java | 4 +-
.../tfile/LogAggregationTFileController.java | 13 +++-
.../logaggregation/TestContainerLogsUtils.java | 4 +-
.../logaggregation/AppLogAggregatorImpl.java | 49 ++++++++++---
.../TestAppLogAggregatorImpl.java | 75 +++++++++++++++++---
.../nodemanager/webapp/TestNMWebServices.java | 7 +-
9 files changed, 183 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b22f56c4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml
index db6c11a..a25c524 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml
@@ -40,6 +40,10 @@
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs-client</artifactId>
+ </dependency>
<!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b22f56c4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
index af3066e..81d5053 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SecureIOUtils;
import org.apache.hadoop.io.Writable;
@@ -547,7 +548,7 @@ public class AggregatedLogFormat {
}
@Override
- public void close() {
+ public void close() throws DSQuotaExceededException {
try {
if (writer != null) {
writer.close();
@@ -555,7 +556,16 @@ public class AggregatedLogFormat {
} catch (Exception e) {
LOG.warn("Exception closing writer", e);
} finally {
- IOUtils.cleanupWithLogger(LOG, this.fsDataOStream);
+ try {
+ this.fsDataOStream.close();
+ } catch (DSQuotaExceededException e) {
+ LOG.error("Exception in closing {}",
+ this.fsDataOStream.getClass(), e);
+ throw e;
+ } catch (Throwable e) {
+ LOG.error("Exception in closing {}",
+ this.fsDataOStream.getClass(), e);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b22f56c4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationDFSException.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationDFSException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationDFSException.java
new file mode 100644
index 0000000..19953e4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationDFSException.java
@@ -0,0 +1,45 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.logaggregation.filecontroller;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+/**
+ * This exception class indicates an issue during log aggregation.
+ */
+public class LogAggregationDFSException extends YarnException {
+
+ private static final long serialVersionUID = -6691549081090183145L;
+
+ public LogAggregationDFSException() {
+ }
+
+ public LogAggregationDFSException(String message) {
+ super(message);
+ }
+
+ public LogAggregationDFSException(Throwable cause) {
+ super(cause);
+ }
+
+ public LogAggregationDFSException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b22f56c4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
index 5ac89e9..d342e3f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
@@ -162,8 +162,10 @@ public abstract class LogAggregationFileController {
/**
* Close the writer.
+ * @throws LogAggregationDFSException if the closing of the writer fails
+ * (for example due to HDFS quota being exceeded)
*/
- public abstract void closeWriter();
+ public abstract void closeWriter() throws LogAggregationDFSException;
/**
* Write the log content.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b22f56c4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java
index a4f50d2..e87af7f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.HarFs;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -46,6 +47,7 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationDFSException;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType;
@@ -95,10 +97,15 @@ public class LogAggregationTFileController
}
@Override
- public void closeWriter() {
+ public void closeWriter() throws LogAggregationDFSException {
if (this.writer != null) {
- this.writer.close();
- this.writer = null;
+ try {
+ this.writer.close();
+ } catch (DSQuotaExceededException e) {
+ throw new LogAggregationDFSException(e);
+ } finally {
+ this.writer = null;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b22f56c4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java
index a12e2a1..4767282 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java
@@ -65,7 +65,7 @@ public final class TestContainerLogsUtils {
public static void createContainerLogFileInRemoteFS(Configuration conf,
FileSystem fs, String rootLogDir, ContainerId containerId, NodeId nodeId,
String fileName, String user, String content,
- boolean deleteRemoteLogDir) throws IOException {
+ boolean deleteRemoteLogDir) throws Exception {
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
//prepare the logs for remote directory
ApplicationId appId = containerId.getApplicationAttemptId()
@@ -113,7 +113,7 @@ public final class TestContainerLogsUtils {
private static void uploadContainerLogIntoRemoteDir(UserGroupInformation ugi,
Configuration configuration, List<String> rootLogDirs, NodeId nodeId,
- ContainerId containerId, Path appDir, FileSystem fs) throws IOException {
+ ContainerId containerId, Path appDir, FileSystem fs) throws Exception {
Path path =
new Path(appDir, LogAggregationUtils.getNodeString(nodeId));
LogAggregationFileControllerFactory factory
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b22f56c4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
index c7e06ff..5956823 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationDFSException;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
import org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.LogAggregationTFileController;
@@ -263,7 +264,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
return params;
}
- private void uploadLogsForContainers(boolean appFinished) {
+ private void uploadLogsForContainers(boolean appFinished)
+ throws LogAggregationDFSException {
if (this.logAggregationDisabled) {
return;
}
@@ -301,6 +303,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
logAggregationTimes++;
String diagnosticMessage = "";
boolean logAggregationSucceedInThisCycle = true;
+ DeletionTask deletionTask = null;
try {
try {
logAggregationFileController.initializeWriter(logControllerContext);
@@ -327,10 +330,9 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
uploadedLogsInThisCycle = true;
List<Path> uploadedFilePathsInThisCycleList = new ArrayList<>();
uploadedFilePathsInThisCycleList.addAll(uploadedFilePathsInThisCycle);
- DeletionTask deletionTask = new FileDeletionTask(delService,
+ deletionTask = new FileDeletionTask(delService,
this.userUgi.getShortUserName(), null,
uploadedFilePathsInThisCycleList);
- delService.delete(deletionTask);
}
// This container is finished, and all its logs have been uploaded,
@@ -356,9 +358,23 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
logAggregationSucceedInThisCycle = false;
}
} finally {
+ LogAggregationDFSException exc = null;
+ try {
+ this.logAggregationFileController.closeWriter();
+ } catch (LogAggregationDFSException e) {
+ diagnosticMessage = e.getMessage();
+ renameTemporaryLogFileFailed = true;
+ logAggregationSucceedInThisCycle = false;
+ exc = e;
+ }
+ if (logAggregationSucceedInThisCycle && deletionTask != null) {
+ delService.delete(deletionTask);
+ }
sendLogAggregationReport(logAggregationSucceedInThisCycle,
diagnosticMessage, appFinished);
- logAggregationFileController.closeWriter();
+ if (exc != null) {
+ throw exc;
+ }
}
}
@@ -413,13 +429,18 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
diagnosticMessage, finalized);
}
- @SuppressWarnings("unchecked")
@Override
public void run() {
try {
doAppLogAggregation();
+ } catch (LogAggregationDFSException e) {
+ // if the log aggregation could not be performed due to DFS issues
+ // let's not clean up the log files, since that can result in
+ // loss of logs
+ LOG.error("Error occurred while aggregating the log for the application "
+ + appId, e);
} catch (Exception e) {
- // do post clean up of log directories on any exception
+ // do post clean up of log directories on any other exception
LOG.error("Error occurred while aggregating the log for the application "
+ appId, e);
doAppLogAggregationPostCleanUp();
@@ -434,8 +455,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
}
}
- @SuppressWarnings("unchecked")
- private void doAppLogAggregation() {
+ private void doAppLogAggregation() throws LogAggregationDFSException {
while (!this.appFinishing.get() && !this.aborted.get()) {
synchronized(this) {
try {
@@ -452,6 +472,9 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
} catch (InterruptedException e) {
LOG.warn("PendingContainers queue is interrupted");
this.appFinishing.set(true);
+ } catch (LogAggregationDFSException e) {
+ this.appFinishing.set(true);
+ throw e;
}
}
}
@@ -460,10 +483,14 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
return;
}
- // App is finished, upload the container logs.
- uploadLogsForContainers(true);
+ try {
+ // App is finished, upload the container logs.
+ uploadLogsForContainers(true);
- doAppLogAggregationPostCleanUp();
+ doAppLogAggregationPostCleanUp();
+ } catch (LogAggregationDFSException e) {
+ LOG.error("Error during log aggregation", e);
+ }
this.dispatcher.getEventHandler().handle(
new ApplicationEvent(this.appId,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b22f56c4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java
index e13c805..95f4c32 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationDFSException;
import org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.LogAggregationTFileController;
import org.apache.hadoop.yarn.server.api.ContainerLogContext;
import org.apache.hadoop.yarn.server.api.ContainerType;
@@ -42,7 +43,9 @@ import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask;
+import org.apache.hadoop.yarn.server.nodemanager.logaggregation.tracker.NMLogAggregationStatusTracker;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
@@ -52,12 +55,14 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -228,10 +233,15 @@ public class TestAppLogAggregatorImpl {
config.setLong(
YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, logRetentionSecs);
+ LogAggregationTFileController format = spy(
+ new LogAggregationTFileController());
+ format.initialize(config, "TFile");
+
+ Context context = createContext(config);
final AppLogAggregatorInTest appLogAggregator =
createAppLogAggregator(appId, LOCAL_LOG_DIR.getAbsolutePath(),
- config, recoveredLogInitedTimeMillis,
- deletionServiceWithExpectedFiles);
+ config, context, recoveredLogInitedTimeMillis,
+ deletionServiceWithExpectedFiles, format);
appLogAggregator.startContainerLogAggregation(
new ContainerLogContext(containerId, ContainerType.TASK, 0));
// set app finished flag first
@@ -269,8 +279,10 @@ public class TestAppLogAggregatorImpl {
private static AppLogAggregatorInTest createAppLogAggregator(
ApplicationId applicationId, String rootLogDir,
- YarnConfiguration config, long recoveredLogInitedTimeMillis,
- DeletionService deletionServiceWithFilesToExpect)
+ YarnConfiguration config, Context context,
+ long recoveredLogInitedTimeMillis,
+ DeletionService deletionServiceWithFilesToExpect,
+ LogAggregationTFileController tFileController)
throws IOException {
final Dispatcher dispatcher = createNullDispatcher();
@@ -284,16 +296,12 @@ public class TestAppLogAggregatorImpl {
final LogAggregationContext logAggregationContext = null;
final Map<ApplicationAccessType, String> appAcls = new HashMap<>();
- final Context context = createContext(config);
final FileContext fakeLfs = mock(FileContext.class);
final Path remoteLogDirForApp = new Path(REMOTE_LOG_FILE.getAbsolutePath());
- LogAggregationTFileController format = spy(
- new LogAggregationTFileController());
- format.initialize(config, "TFile");
return new AppLogAggregatorInTest(dispatcher, deletionService,
config, applicationId, ugi, nodeId, dirsService,
remoteLogDirForApp, appAcls, logAggregationContext,
- context, fakeLfs, recoveredLogInitedTimeMillis, format);
+ context, fakeLfs, recoveredLogInitedTimeMillis, tFileController);
}
/**
@@ -423,4 +431,53 @@ public class TestAppLogAggregatorImpl {
this.logValue = ArgumentCaptor.forClass(LogValue.class);
}
}
+
+ @Test
+ public void testDFSQuotaExceeded() throws Exception {
+
+ // the expectation is that no log files are deleted if the quota has
+ // been exceeded, since that would result in loss of logs
+ DeletionService deletionServiceWithExpectedFiles =
+ createDeletionServiceWithExpectedFile2Delete(Collections.emptySet());
+
+ final YarnConfiguration config = new YarnConfiguration();
+
+ ApplicationId appId = ApplicationId.newInstance(1357543L, 1);
+
+ // we need a LogAggregationTFileController that throws a
+ // LogAggregationDFSException
+ LogAggregationTFileController format =
+ Mockito.mock(LogAggregationTFileController.class);
+ Mockito.doThrow(new LogAggregationDFSException())
+ .when(format).closeWriter();
+
+ NodeManager.NMContext context = (NMContext) createContext(config);
+ context.setNMLogAggregationStatusTracker(
+ Mockito.mock(NMLogAggregationStatusTracker.class));
+
+ final AppLogAggregatorInTest appLogAggregator =
+ createAppLogAggregator(appId, LOCAL_LOG_DIR.getAbsolutePath(),
+ config, context, 1000L, deletionServiceWithExpectedFiles, format);
+
+ appLogAggregator.startContainerLogAggregation(
+ new ContainerLogContext(
+ ContainerId.newContainerId(
+ ApplicationAttemptId.newInstance(appId, 0), 0),
+ ContainerType.TASK, 0));
+ // set app finished flag first
+ appLogAggregator.finishLogAggregation();
+ appLogAggregator.run();
+
+ // verify that no files have been uploaded
+ ArgumentCaptor<LogValue> logValCaptor =
+ ArgumentCaptor.forClass(LogValue.class);
+ verify(appLogAggregator.getLogAggregationFileController()).write(
+ any(LogKey.class), logValCaptor.capture());
+ Set<String> filesUploaded = new HashSet<>();
+ LogValue logValue = logValCaptor.getValue();
+ for (File file: logValue.getPendingLogFilesToUploadForThisContainer()) {
+ filesUploaded.add(file.getAbsolutePath());
+ }
+ verifyFilesUploaded(filesUploaded, Collections.emptySet());
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b22f56c4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
index 39e403d..dbd980b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
@@ -87,7 +87,6 @@ import javax.ws.rs.core.MediaType;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import java.io.File;
-import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringReader;
import java.net.HttpURLConnection;
@@ -356,7 +355,7 @@ public class TestNMWebServices extends JerseyTestBase {
}
@Test (timeout = 5000)
- public void testContainerLogsWithNewAPI() throws IOException, JSONException{
+ public void testContainerLogsWithNewAPI() throws Exception {
final ContainerId containerId = BuilderUtils.newContainerId(0, 0, 0, 0);
WebResource r = resource();
r = r.path("ws").path("v1").path("node").path("containers")
@@ -365,7 +364,7 @@ public class TestNMWebServices extends JerseyTestBase {
}
@Test (timeout = 5000)
- public void testContainerLogsWithOldAPI() throws IOException, JSONException{
+ public void testContainerLogsWithOldAPI() throws Exception {
final ContainerId containerId = BuilderUtils.newContainerId(1, 1, 0, 1);
WebResource r = resource();
r = r.path("ws").path("v1").path("node").path("containerlogs")
@@ -538,7 +537,7 @@ public class TestNMWebServices extends JerseyTestBase {
}
private void testContainerLogs(WebResource r, ContainerId containerId)
- throws IOException {
+ throws Exception {
final String containerIdStr = containerId.toString();
final ApplicationAttemptId appAttemptId = containerId
.getApplicationAttemptId();
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org