You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ec...@apache.org on 2015/09/15 21:11:55 UTC
[10/50] [abbrv] hadoop git commit: MAPREDUCE-6415. Create a tool to
combine aggregated logs into HAR files. (Robert Kanter via kasha)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/119cc75e/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogs.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogs.java b/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogs.java
new file mode 100644
index 0000000..c8ff201
--- /dev/null
+++ b/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogs.java
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Random;
+
+public class TestHadoopArchiveLogs {
+
+ private static final long CLUSTER_TIMESTAMP = System.currentTimeMillis();
+ private static final int FILE_SIZE_INCREMENT = 4096;
+ private static final byte[] DUMMY_DATA = new byte[FILE_SIZE_INCREMENT];
+ static {
+ new Random().nextBytes(DUMMY_DATA);
+ }
+
+ @Test(timeout = 10000)
+ public void testCheckFiles() throws Exception {
+ Configuration conf = new Configuration();
+ HadoopArchiveLogs hal = new HadoopArchiveLogs(conf);
+ FileSystem fs = FileSystem.getLocal(conf);
+ Path rootLogDir = new Path("target", "logs");
+ String suffix = "logs";
+ Path logDir = new Path(rootLogDir,
+ new Path(System.getProperty("user.name"), suffix));
+ fs.mkdirs(logDir);
+
+ Assert.assertEquals(0, hal.eligibleApplications.size());
+ ApplicationReport app1 = createAppReport(1); // no files found
+ ApplicationReport app2 = createAppReport(2); // too few files
+ Path app2Path = new Path(logDir, app2.getApplicationId().toString());
+ fs.mkdirs(app2Path);
+ createFile(fs, new Path(app2Path, "file1"), 1);
+ hal.minNumLogFiles = 2;
+ ApplicationReport app3 = createAppReport(3); // too large
+ Path app3Path = new Path(logDir, app3.getApplicationId().toString());
+ fs.mkdirs(app3Path);
+ createFile(fs, new Path(app3Path, "file1"), 2);
+ createFile(fs, new Path(app3Path, "file2"), 5);
+ hal.maxTotalLogsSize = FILE_SIZE_INCREMENT * 6;
+ ApplicationReport app4 = createAppReport(4); // has har already
+ Path app4Path = new Path(logDir, app4.getApplicationId().toString());
+ fs.mkdirs(app4Path);
+ createFile(fs, new Path(app4Path, app4.getApplicationId() + ".har"), 1);
+ ApplicationReport app5 = createAppReport(5); // just right
+ Path app5Path = new Path(logDir, app5.getApplicationId().toString());
+ fs.mkdirs(app5Path);
+ createFile(fs, new Path(app5Path, "file1"), 2);
+ createFile(fs, new Path(app5Path, "file2"), 3);
+ hal.eligibleApplications.add(app1);
+ hal.eligibleApplications.add(app2);
+ hal.eligibleApplications.add(app3);
+ hal.eligibleApplications.add(app4);
+ hal.eligibleApplications.add(app5);
+
+ hal.checkFiles(fs, rootLogDir, suffix);
+ Assert.assertEquals(1, hal.eligibleApplications.size());
+ Assert.assertEquals(app5, hal.eligibleApplications.iterator().next());
+ }
+
+ @Test(timeout = 10000)
+ public void testCheckMaxEligible() throws Exception {
+ Configuration conf = new Configuration();
+ HadoopArchiveLogs hal = new HadoopArchiveLogs(conf);
+ ApplicationReport app1 = createAppReport(1);
+ app1.setFinishTime(CLUSTER_TIMESTAMP - 5);
+ ApplicationReport app2 = createAppReport(2);
+ app2.setFinishTime(CLUSTER_TIMESTAMP - 10);
+ ApplicationReport app3 = createAppReport(3);
+ app3.setFinishTime(CLUSTER_TIMESTAMP + 5);
+ ApplicationReport app4 = createAppReport(4);
+ app4.setFinishTime(CLUSTER_TIMESTAMP + 10);
+ ApplicationReport app5 = createAppReport(5);
+ app5.setFinishTime(CLUSTER_TIMESTAMP);
+ Assert.assertEquals(0, hal.eligibleApplications.size());
+ hal.eligibleApplications.add(app1);
+ hal.eligibleApplications.add(app2);
+ hal.eligibleApplications.add(app3);
+ hal.eligibleApplications.add(app4);
+ hal.eligibleApplications.add(app5);
+ hal.maxEligible = -1;
+ hal.checkMaxEligible();
+ Assert.assertEquals(5, hal.eligibleApplications.size());
+
+ hal.maxEligible = 4;
+ hal.checkMaxEligible();
+ Assert.assertEquals(4, hal.eligibleApplications.size());
+ Assert.assertFalse(hal.eligibleApplications.contains(app4));
+
+ hal.maxEligible = 3;
+ hal.checkMaxEligible();
+ Assert.assertEquals(3, hal.eligibleApplications.size());
+ Assert.assertFalse(hal.eligibleApplications.contains(app3));
+
+ hal.maxEligible = 2;
+ hal.checkMaxEligible();
+ Assert.assertEquals(2, hal.eligibleApplications.size());
+ Assert.assertFalse(hal.eligibleApplications.contains(app5));
+
+ hal.maxEligible = 1;
+ hal.checkMaxEligible();
+ Assert.assertEquals(1, hal.eligibleApplications.size());
+ Assert.assertFalse(hal.eligibleApplications.contains(app1));
+ }
+
+ @Test(timeout = 10000)
+ public void testFindAggregatedApps() throws Exception {
+ MiniYARNCluster yarnCluster = null;
+ try {
+ Configuration conf = new Configuration();
+ conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
+ yarnCluster =
+ new MiniYARNCluster(TestHadoopArchiveLogs.class.getSimpleName(), 1,
+ 1, 1, 1);
+ yarnCluster.init(conf);
+ yarnCluster.start();
+ conf = yarnCluster.getConfig();
+
+ RMContext rmContext = yarnCluster.getResourceManager().getRMContext();
+ RMAppImpl app1 = (RMAppImpl)createRMApp(1, conf, rmContext,
+ LogAggregationStatus.DISABLED);
+ RMAppImpl app2 = (RMAppImpl)createRMApp(2, conf, rmContext,
+ LogAggregationStatus.FAILED);
+ RMAppImpl app3 = (RMAppImpl)createRMApp(3, conf, rmContext,
+ LogAggregationStatus.NOT_START);
+ RMAppImpl app4 = (RMAppImpl)createRMApp(4, conf, rmContext,
+ LogAggregationStatus.SUCCEEDED);
+ RMAppImpl app5 = (RMAppImpl)createRMApp(5, conf, rmContext,
+ LogAggregationStatus.RUNNING);
+ RMAppImpl app6 = (RMAppImpl)createRMApp(6, conf, rmContext,
+ LogAggregationStatus.RUNNING_WITH_FAILURE);
+ RMAppImpl app7 = (RMAppImpl)createRMApp(7, conf, rmContext,
+ LogAggregationStatus.TIME_OUT);
+ rmContext.getRMApps().put(app1.getApplicationId(), app1);
+ rmContext.getRMApps().put(app2.getApplicationId(), app2);
+ rmContext.getRMApps().put(app3.getApplicationId(), app3);
+ rmContext.getRMApps().put(app4.getApplicationId(), app4);
+ rmContext.getRMApps().put(app5.getApplicationId(), app5);
+ rmContext.getRMApps().put(app6.getApplicationId(), app6);
+ rmContext.getRMApps().put(app7.getApplicationId(), app7);
+
+ HadoopArchiveLogs hal = new HadoopArchiveLogs(conf);
+ Assert.assertEquals(0, hal.eligibleApplications.size());
+ hal.findAggregatedApps();
+ Assert.assertEquals(2, hal.eligibleApplications.size());
+ } finally {
+ if (yarnCluster != null) {
+ yarnCluster.stop();
+ }
+ }
+ }
+
+ @Test(timeout = 10000)
+ public void testGenerateScript() throws Exception {
+ Configuration conf = new Configuration();
+ HadoopArchiveLogs hal = new HadoopArchiveLogs(conf);
+ ApplicationReport app1 = createAppReport(1);
+ ApplicationReport app2 = createAppReport(2);
+ hal.eligibleApplications.add(app1);
+ hal.eligibleApplications.add(app2);
+
+ File localScript = new File("target", "script.sh");
+ Path workingDir = new Path("/tmp", "working");
+ Path remoteRootLogDir = new Path("/tmp", "logs");
+ String suffix = "logs";
+ localScript.delete();
+ Assert.assertFalse(localScript.exists());
+ hal.generateScript(localScript, workingDir, remoteRootLogDir, suffix);
+ Assert.assertTrue(localScript.exists());
+ String script = IOUtils.toString(localScript.toURI());
+ String[] lines = script.split(System.lineSeparator());
+ Assert.assertEquals(16, lines.length);
+ Assert.assertEquals("#!/bin/bash", lines[0]);
+ Assert.assertEquals("set -e", lines[1]);
+ Assert.assertEquals("set -x", lines[2]);
+ Assert.assertEquals("if [ \"$YARN_SHELL_ID\" == \"1\" ]; then", lines[3]);
+ if (lines[4].contains(app1.getApplicationId().toString())) {
+ Assert.assertEquals("\tappId=\"" + app1.getApplicationId().toString()
+ + "\"", lines[4]);
+ Assert.assertEquals("\tappId=\"" + app2.getApplicationId().toString()
+ + "\"", lines[7]);
+ } else {
+ Assert.assertEquals("\tappId=\"" + app2.getApplicationId().toString()
+ + "\"", lines[4]);
+ Assert.assertEquals("\tappId=\"" + app1.getApplicationId().toString()
+ + "\"", lines[7]);
+ }
+ Assert.assertEquals("\tuser=\"" + System.getProperty("user.name") + "\"",
+ lines[5]);
+ Assert.assertEquals("elif [ \"$YARN_SHELL_ID\" == \"2\" ]; then", lines[6]);
+ Assert.assertEquals("\tuser=\"" + System.getProperty("user.name") + "\"",
+ lines[8]);
+ Assert.assertEquals("else", lines[9]);
+ Assert.assertEquals("\techo \"Unknown Mapping!\"", lines[10]);
+ Assert.assertEquals("\texit 1", lines[11]);
+ Assert.assertEquals("fi", lines[12]);
+ Assert.assertEquals("export HADOOP_CLIENT_OPTS=\"-Xmx1024m\"", lines[13]);
+ Assert.assertTrue(lines[14].startsWith("export HADOOP_CLASSPATH="));
+ Assert.assertEquals("\"$HADOOP_HOME\"/bin/hadoop org.apache.hadoop.tools." +
+ "HadoopArchiveLogsRunner -appId \"$appId\" -user \"$user\" -workingDir "
+ + workingDir.toString() + " -remoteRootLogDir " +
+ remoteRootLogDir.toString() + " -suffix " + suffix, lines[15]);
+ }
+
+ private static ApplicationReport createAppReport(int id) {
+ ApplicationId appId = ApplicationId.newInstance(CLUSTER_TIMESTAMP, id);
+ return ApplicationReport.newInstance(
+ appId,
+ ApplicationAttemptId.newInstance(appId, 1),
+ System.getProperty("user.name"),
+ null, null, null, 0, null, YarnApplicationState.FINISHED, null,
+ null, 0L, 0L, FinalApplicationStatus.SUCCEEDED, null, null, 100f,
+ null, null);
+ }
+
+ private static void createFile(FileSystem fs, Path p, long sizeMultiple)
+ throws IOException {
+ FSDataOutputStream out = null;
+ try {
+ out = fs.create(p);
+ for (int i = 0 ; i < sizeMultiple; i++) {
+ out.write(DUMMY_DATA);
+ }
+ } finally {
+ if (out != null) {
+ out.close();
+ }
+ }
+ }
+
+ private static RMApp createRMApp(int id, Configuration conf, RMContext rmContext,
+ final LogAggregationStatus aggStatus) {
+ ApplicationId appId = ApplicationId.newInstance(CLUSTER_TIMESTAMP, id);
+ ApplicationSubmissionContext submissionContext =
+ ApplicationSubmissionContext.newInstance(appId, "test", "default",
+ Priority.newInstance(0), null, false, true,
+ 2, Resource.newInstance(10, 2), "test");
+ return new RMAppImpl(appId, rmContext, conf, "test",
+ System.getProperty("user.name"), "default", submissionContext,
+ rmContext.getScheduler(),
+ rmContext.getApplicationMasterService(),
+ System.currentTimeMillis(), "test",
+ null, null) {
+ @Override
+ public ApplicationReport createAndGetApplicationReport(
+ String clientUserName, boolean allowAccess) {
+ ApplicationReport report =
+ super.createAndGetApplicationReport(clientUserName, allowAccess);
+ report.setLogAggregationStatus(aggStatus);
+ return report;
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/119cc75e/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogsRunner.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogsRunner.java b/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogsRunner.java
new file mode 100644
index 0000000..af66f14
--- /dev/null
+++ b/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogsRunner.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.HarFs;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestHadoopArchiveLogsRunner {
+
+ private static final int FILE_SIZE_INCREMENT = 4096;
+ private static final byte[] DUMMY_DATA = new byte[FILE_SIZE_INCREMENT];
+ static {
+ new Random().nextBytes(DUMMY_DATA);
+ }
+
+ @Test(timeout = 30000)
+ public void testHadoopArchiveLogs() throws Exception {
+ MiniYARNCluster yarnCluster = null;
+ MiniDFSCluster dfsCluster = null;
+ FileSystem fs = null;
+ try {
+ Configuration conf = new YarnConfiguration();
+ conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
+ conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
+ yarnCluster =
+ new MiniYARNCluster(TestHadoopArchiveLogsRunner.class.getSimpleName(),
+ 1, 2, 1, 1);
+ yarnCluster.init(conf);
+ yarnCluster.start();
+ conf = yarnCluster.getConfig();
+ dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+
+ ApplicationId app1 =
+ ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ fs = FileSystem.get(conf);
+ Path remoteRootLogDir = new Path(conf.get(
+ YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+ YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
+ Path workingDir = new Path(remoteRootLogDir, "archive-logs-work");
+ String suffix = "logs";
+ Path logDir = new Path(remoteRootLogDir,
+ new Path(System.getProperty("user.name"), suffix));
+ fs.mkdirs(logDir);
+ Path app1Path = new Path(logDir, app1.toString());
+ fs.mkdirs(app1Path);
+ createFile(fs, new Path(app1Path, "log1"), 3);
+ createFile(fs, new Path(app1Path, "log2"), 4);
+ createFile(fs, new Path(app1Path, "log3"), 2);
+ FileStatus[] app1Files = fs.listStatus(app1Path);
+ Assert.assertEquals(3, app1Files.length);
+
+ String[] args = new String[]{
+ "-appId", app1.toString(),
+ "-user", System.getProperty("user.name"),
+ "-workingDir", workingDir.toString(),
+ "-remoteRootLogDir", remoteRootLogDir.toString(),
+ "-suffix", suffix};
+ final HadoopArchiveLogsRunner halr = new HadoopArchiveLogsRunner(conf);
+ assertEquals(0, ToolRunner.run(halr, args));
+
+ fs = FileSystem.get(conf);
+ app1Files = fs.listStatus(app1Path);
+ Assert.assertEquals(1, app1Files.length);
+ FileStatus harFile = app1Files[0];
+ Assert.assertEquals(app1.toString() + ".har", harFile.getPath().getName());
+ Path harPath = new Path("har:///" + harFile.getPath().toUri().getRawPath());
+ FileStatus[] harLogs = HarFs.get(harPath.toUri(), conf).listStatus(harPath);
+ Assert.assertEquals(3, harLogs.length);
+ Arrays.sort(harLogs, new Comparator<FileStatus>() {
+ @Override
+ public int compare(FileStatus o1, FileStatus o2) {
+ return o1.getPath().getName().compareTo(o2.getPath().getName());
+ }
+ });
+ Assert.assertEquals("log1", harLogs[0].getPath().getName());
+ Assert.assertEquals(3 * FILE_SIZE_INCREMENT, harLogs[0].getLen());
+ Assert.assertEquals("log2", harLogs[1].getPath().getName());
+ Assert.assertEquals(4 * FILE_SIZE_INCREMENT, harLogs[1].getLen());
+ Assert.assertEquals("log3", harLogs[2].getPath().getName());
+ Assert.assertEquals(2 * FILE_SIZE_INCREMENT, harLogs[2].getLen());
+ Assert.assertEquals(0, fs.listStatus(workingDir).length);
+ } finally {
+ if (yarnCluster != null) {
+ yarnCluster.stop();
+ }
+ if (fs != null) {
+ fs.close();
+ }
+ if (dfsCluster != null) {
+ dfsCluster.shutdown();
+ }
+ }
+ }
+
+ private static void createFile(FileSystem fs, Path p, long sizeMultiple)
+ throws IOException {
+ FSDataOutputStream out = null;
+ try {
+ out = fs.create(p);
+ for (int i = 0 ; i < sizeMultiple; i++) {
+ out.write(DUMMY_DATA);
+ }
+ } finally {
+ if (out != null) {
+ out.close();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/119cc75e/hadoop-tools/hadoop-tools-dist/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-tools-dist/pom.xml b/hadoop-tools/hadoop-tools-dist/pom.xml
index 540401d..e6c458f 100644
--- a/hadoop-tools/hadoop-tools-dist/pom.xml
+++ b/hadoop-tools/hadoop-tools-dist/pom.xml
@@ -52,6 +52,11 @@
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-archive-logs</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-rumen</artifactId>
<scope>compile</scope>
</dependency>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/119cc75e/hadoop-tools/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-tools/pom.xml b/hadoop-tools/pom.xml
index 5b35f46..0061bf0 100644
--- a/hadoop-tools/pom.xml
+++ b/hadoop-tools/pom.xml
@@ -34,6 +34,7 @@
<module>hadoop-streaming</module>
<module>hadoop-distcp</module>
<module>hadoop-archives</module>
+ <module>hadoop-archive-logs</module>
<module>hadoop-rumen</module>
<module>hadoop-gridmix</module>
<module>hadoop-datajoin</module>