You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-issues@hadoop.apache.org by GitBox <gi...@apache.org> on 2022/06/22 22:59:15 UTC

[GitHub] [hadoop] mukund-thakur commented on a diff in pull request #4383: HADOOP-18258. Merging of S3A Audit Logs

mukund-thakur commented on code in PR #4383:
URL: https://github.com/apache/hadoop/pull/4383#discussion_r904350867


##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AuditTool.java:
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.audit;
+
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FilterFileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import static org.apache.hadoop.service.launcher.LauncherExitCodes.EXIT_COMMAND_ARGUMENT_ERROR;
+import static org.apache.hadoop.service.launcher.LauncherExitCodes.EXIT_SERVICE_UNAVAILABLE;
+import static org.apache.hadoop.service.launcher.LauncherExitCodes.EXIT_SUCCESS;
+
+/**
+ * AuditTool is a Command Line Interface.
+ * i.e, it's functionality is to parse the merged audit log file.
+ * and generate avro file.
+ */
+public class AuditTool extends Configured implements Tool, Closeable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AuditTool.class);
+
+  private final String entryPoint = "s3audit";
+
+  private PrintWriter out;
+
+  // Exit codes
+  private static final int SUCCESS = EXIT_SUCCESS;
+  private static final int INVALID_ARGUMENT = EXIT_COMMAND_ARGUMENT_ERROR;
+
+  /**
+   * Error String when the wrong FS is used for binding: {@value}.
+   **/
+  @VisibleForTesting
+  public static final String WRONG_FILESYSTEM = "Wrong filesystem for ";
+
+  private final String usage = entryPoint + "  s3a://BUCKET\n";
+
+  public AuditTool() {
+  }
+
+  /**
+   * Tells us the usage of the AuditTool by commands.
+   *
+   * @return the string USAGE
+   */
+  public String getUsage() {
+    return usage;
+  }
+
+  /**
+   * This run method in AuditTool takes S3 bucket path.
+   * which contains audit log files from command line arguments.
+   * and merge the audit log files present in that path into single file in.
+   * local system.
+   *
+   * @param args command specific arguments.
+   * @return SUCCESS i.e, '0', which is an exit code.
+   * @throws Exception on any failure.
+   */
+  @Override
+  public int run(String[] args) throws Exception {
+    List<String> argv = new ArrayList<>(Arrays.asList(args));
+    if (argv.isEmpty()) {
+      errorln(getUsage());
+      throw invalidArgs("No bucket specified");
+    }
+    //Path of audit log files in s3 bucket
+    Path s3LogsPath = new Path(argv.get(0));
+
+    //Setting the file system
+    URI fsURI = toUri(String.valueOf(s3LogsPath));
+    S3AFileSystem s3AFileSystem =
+        bindFilesystem(FileSystem.newInstance(fsURI, getConf()));
+    RemoteIterator<LocatedFileStatus> listOfS3LogFiles =
+        s3AFileSystem.listFiles(s3LogsPath, true);
+
+    //Merging local audit files into a single file
+    File s3aLogsDirectory = new File(s3LogsPath.getName());
+    boolean s3aLogsDirectoryCreation = false;
+    if (!s3aLogsDirectory.exists()) {
+      s3aLogsDirectoryCreation = s3aLogsDirectory.mkdir();
+    }
+    if(s3aLogsDirectoryCreation) {
+      while (listOfS3LogFiles.hasNext()) {
+        Path s3LogFilePath = listOfS3LogFiles.next().getPath();
+        File s3LogLocalFilePath =
+            new File(s3aLogsDirectory, s3LogFilePath.getName());
+        boolean localFileCreation = s3LogLocalFilePath.createNewFile();
+        if (localFileCreation) {
+          FileStatus fileStatus = s3AFileSystem.getFileStatus(s3LogFilePath);
+          long s3LogFileLength = fileStatus.getLen();
+          //Reads s3 file data into byte buffer
+          byte[] s3LogDataBuffer =
+              readDataset(s3AFileSystem, s3LogFilePath, (int) s3LogFileLength);
+          //Writes byte array into local file
+          FileUtils.writeByteArrayToFile(s3LogLocalFilePath, s3LogDataBuffer);
+        }
+      }
+    }
+
+    //Calls S3AAuditLogMerger for implementing merging code
+    //by passing local audit log files directory which are copied from s3 bucket
+    S3AAuditLogMerger s3AAuditLogMerger = new S3AAuditLogMerger();
+    s3AAuditLogMerger.mergeFiles(s3aLogsDirectory.getPath());

Review Comment:
   merge files should return true/false



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AuditTool.java:
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.audit;
+
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FilterFileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import static org.apache.hadoop.service.launcher.LauncherExitCodes.EXIT_COMMAND_ARGUMENT_ERROR;
+import static org.apache.hadoop.service.launcher.LauncherExitCodes.EXIT_SERVICE_UNAVAILABLE;
+import static org.apache.hadoop.service.launcher.LauncherExitCodes.EXIT_SUCCESS;
+
+/**
+ * AuditTool is a Command Line Interface.
+ * i.e, it's functionality is to parse the merged audit log file.
+ * and generate avro file.
+ */
+public class AuditTool extends Configured implements Tool, Closeable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AuditTool.class);
+
+  private final String entryPoint = "s3audit";
+
+  private PrintWriter out;
+
+  // Exit codes
+  private static final int SUCCESS = EXIT_SUCCESS;
+  private static final int INVALID_ARGUMENT = EXIT_COMMAND_ARGUMENT_ERROR;
+
+  /**
+   * Error String when the wrong FS is used for binding: {@value}.
+   **/
+  @VisibleForTesting
+  public static final String WRONG_FILESYSTEM = "Wrong filesystem for ";

Review Comment:
   You can say unsupported operation exception with message not supported. What if we do something similar for Azure in future.



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AuditTool.java:
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.audit;
+
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FilterFileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import static org.apache.hadoop.service.launcher.LauncherExitCodes.EXIT_COMMAND_ARGUMENT_ERROR;
+import static org.apache.hadoop.service.launcher.LauncherExitCodes.EXIT_SERVICE_UNAVAILABLE;
+import static org.apache.hadoop.service.launcher.LauncherExitCodes.EXIT_SUCCESS;
+
+/**
+ * AuditTool is a Command Line Interface.
+ * i.e, it's functionality is to parse the merged audit log file.
+ * and generate avro file.
+ */
+public class AuditTool extends Configured implements Tool, Closeable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AuditTool.class);
+
+  private final String entryPoint = "s3audit";
+
+  private PrintWriter out;
+
+  // Exit codes
+  private static final int SUCCESS = EXIT_SUCCESS;
+  private static final int INVALID_ARGUMENT = EXIT_COMMAND_ARGUMENT_ERROR;
+
+  /**
+   * Error String when the wrong FS is used for binding: {@value}.
+   **/
+  @VisibleForTesting
+  public static final String WRONG_FILESYSTEM = "Wrong filesystem for ";
+
+  private final String usage = entryPoint + "  s3a://BUCKET\n";
+
+  public AuditTool() {
+  }
+
+  /**
+   * Tells us the usage of the AuditTool by commands.
+   *
+   * @return the string USAGE
+   */
+  public String getUsage() {
+    return usage;
+  }
+
+  /**
+   * This run method in AuditTool takes S3 bucket path.
+   * which contains audit log files from command line arguments.
+   * and merge the audit log files present in that path into single file in.
+   * local system.
+   *
+   * @param args command specific arguments.
+   * @return SUCCESS i.e, '0', which is an exit code.
+   * @throws Exception on any failure.
+   */
+  @Override
+  public int run(String[] args) throws Exception {
+    List<String> argv = new ArrayList<>(Arrays.asList(args));
+    if (argv.isEmpty()) {
+      errorln(getUsage());
+      throw invalidArgs("No bucket specified");
+    }
+    //Path of audit log files in s3 bucket
+    Path s3LogsPath = new Path(argv.get(0));
+
+    //Setting the file system
+    URI fsURI = toUri(String.valueOf(s3LogsPath));
+    S3AFileSystem s3AFileSystem =
+        bindFilesystem(FileSystem.newInstance(fsURI, getConf()));
+    RemoteIterator<LocatedFileStatus> listOfS3LogFiles =
+        s3AFileSystem.listFiles(s3LogsPath, true);
+
+    //Merging local audit files into a single file
+    File s3aLogsDirectory = new File(s3LogsPath.getName());
+    boolean s3aLogsDirectoryCreation = false;
+    if (!s3aLogsDirectory.exists()) {
+      s3aLogsDirectoryCreation = s3aLogsDirectory.mkdir();
+    }
+    if(s3aLogsDirectoryCreation) {
+      while (listOfS3LogFiles.hasNext()) {
+        Path s3LogFilePath = listOfS3LogFiles.next().getPath();
+        File s3LogLocalFilePath =
+            new File(s3aLogsDirectory, s3LogFilePath.getName());
+        boolean localFileCreation = s3LogLocalFilePath.createNewFile();
+        if (localFileCreation) {
+          FileStatus fileStatus = s3AFileSystem.getFileStatus(s3LogFilePath);
+          long s3LogFileLength = fileStatus.getLen();
+          //Reads s3 file data into byte buffer
+          byte[] s3LogDataBuffer =

Review Comment:
   yeah this is going to cause OOM for bigger files.



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/S3AAuditLogMerger.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.audit;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Merge all the audit logs present in a directory of.
+ * multiple audit log files into a single audit log file.
+ */
+public class S3AAuditLogMerger {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(S3AAuditLogMerger.class);
+
+  /**
+   * Merge all the audit log files from a directory into single audit log file.
+   * @param auditLogsDirectoryPath path where audit log files are present.
+   * @throws IOException on any failure.
+   */
+  public void mergeFiles(String auditLogsDirectoryPath) throws IOException {
+    File auditLogFilesDirectory = new File(auditLogsDirectoryPath);
+    String[] auditLogFileNames = auditLogFilesDirectory.list();
+
+    //Merging of audit log files present in a directory into a single audit log file
+    if (auditLogFileNames != null && auditLogFileNames.length != 0) {
+      File auditLogFile = new File("AuditLogFile");
+      try (PrintWriter printWriter = new PrintWriter(auditLogFile,
+          "UTF-8")) {
+        for (String singleAuditLogFileName : auditLogFileNames) {
+          File singleAuditLogFile =
+              new File(auditLogFilesDirectory, singleAuditLogFileName);
+          try (BufferedReader bufferedReader =
+              new BufferedReader(
+                  new InputStreamReader(new FileInputStream(singleAuditLogFile),
+                      "UTF-8"))) {
+            String singleLine = bufferedReader.readLine();
+            while (singleLine != null) {
+              printWriter.println(singleLine);

Review Comment:
   Yes, convert to key -> value pair for each log line. 



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/S3AAuditLogMerger.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.audit;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Merge all the audit logs present in a directory of.
+ * multiple audit log files into a single audit log file.
+ */
+public class S3AAuditLogMerger {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(S3AAuditLogMerger.class);
+
+  /**
+   * Merge all the audit log files from a directory into single audit log file.
+   * @param auditLogsDirectoryPath path where audit log files are present.
+   * @throws IOException on any failure.
+   */
+  public void mergeFiles(String auditLogsDirectoryPath) throws IOException {
+    File auditLogFilesDirectory = new File(auditLogsDirectoryPath);
+    String[] auditLogFileNames = auditLogFilesDirectory.list();
+
+    //Merging of audit log files present in a directory into a single audit log file
+    if (auditLogFileNames != null && auditLogFileNames.length != 0) {
+      File auditLogFile = new File("AuditLogFile");

Review Comment:
   Why not use iterator? 



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AuditTool.java:
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.audit;
+
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FilterFileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import static org.apache.hadoop.service.launcher.LauncherExitCodes.EXIT_COMMAND_ARGUMENT_ERROR;
+import static org.apache.hadoop.service.launcher.LauncherExitCodes.EXIT_SERVICE_UNAVAILABLE;
+import static org.apache.hadoop.service.launcher.LauncherExitCodes.EXIT_SUCCESS;
+
+/**
+ * AuditTool is a Command Line Interface.
+ * i.e, it's functionality is to parse the merged audit log file.
+ * and generate avro file.
+ */
+public class AuditTool extends Configured implements Tool, Closeable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AuditTool.class);
+
+  private final String entryPoint = "s3audit";
+
+  private PrintWriter out;
+
+  // Exit codes
+  private static final int SUCCESS = EXIT_SUCCESS;
+  private static final int INVALID_ARGUMENT = EXIT_COMMAND_ARGUMENT_ERROR;
+
+  /**
+   * Error String when the wrong FS is used for binding: {@value}.
+   **/
+  @VisibleForTesting
+  public static final String WRONG_FILESYSTEM = "Wrong filesystem for ";
+
+  private final String usage = entryPoint + "  s3a://BUCKET\n";
+
+  public AuditTool() {
+  }
+
+  /**
+   * Tells us the usage of the AuditTool by commands.
+   *
+   * @return the string USAGE
+   */
+  public String getUsage() {
+    return usage;
+  }
+
+  /**
+   * This run method in AuditTool takes S3 bucket path.
+   * which contains audit log files from command line arguments.
+   * and merge the audit log files present in that path into single file in.
+   * local system.
+   *
+   * @param args command specific arguments.
+   * @return SUCCESS i.e, '0', which is an exit code.
+   * @throws Exception on any failure.
+   */
+  @Override
+  public int run(String[] args) throws Exception {
+    List<String> argv = new ArrayList<>(Arrays.asList(args));
+    if (argv.isEmpty()) {
+      errorln(getUsage());
+      throw invalidArgs("No bucket specified");
+    }
+    //Path of audit log files in s3 bucket
+    Path s3LogsPath = new Path(argv.get(0));
+

Review Comment:
   yes take both input and output path as argument and do validation like, 
   logs path should be a directory with some files, if empty fail, if not exists fail etc. 
   Destination path should be empty if already exists or shouldn't exist. Or print warning message that I am going to overwrite it. 



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/S3AAuditLogMerger.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.audit;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Merge all the audit logs present in a directory of.
+ * multiple audit log files into a single audit log file.
+ */
+public class S3AAuditLogMerger {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(S3AAuditLogMerger.class);
+
+  /**
+   * Merge all the audit log files from a directory into single audit log file.
+   * @param auditLogsDirectoryPath path where audit log files are present.
+   * @throws IOException on any failure.
+   */
+  public void mergeFiles(String auditLogsDirectoryPath) throws IOException {
+    File auditLogFilesDirectory = new File(auditLogsDirectoryPath);
+    String[] auditLogFileNames = auditLogFilesDirectory.list();
+
+    //Merging of audit log files present in a directory into a single audit log file
+    if (auditLogFileNames != null && auditLogFileNames.length != 0) {
+      File auditLogFile = new File("AuditLogFile");
+      try (PrintWriter printWriter = new PrintWriter(auditLogFile,
+          "UTF-8")) {
+        for (String singleAuditLogFileName : auditLogFileNames) {
+          File singleAuditLogFile =
+              new File(auditLogFilesDirectory, singleAuditLogFileName);
+          try (BufferedReader bufferedReader =
+              new BufferedReader(
+                  new InputStreamReader(new FileInputStream(singleAuditLogFile),
+                      "UTF-8"))) {

Review Comment:
   try to refactor into smaller methods and write unit tests for them separately. 



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AuditTool.java:
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.audit;
+
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FilterFileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import static org.apache.hadoop.service.launcher.LauncherExitCodes.EXIT_COMMAND_ARGUMENT_ERROR;
+import static org.apache.hadoop.service.launcher.LauncherExitCodes.EXIT_SERVICE_UNAVAILABLE;
+import static org.apache.hadoop.service.launcher.LauncherExitCodes.EXIT_SUCCESS;
+
+/**
+ * AuditTool is a Command Line Interface.
+ * i.e, it's functionality is to parse the merged audit log file.
+ * and generate avro file.
+ */
+public class AuditTool extends Configured implements Tool, Closeable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AuditTool.class);
+
+  private final String entryPoint = "s3audit";
+
+  private PrintWriter out;
+
+  // Exit codes
+  private static final int SUCCESS = EXIT_SUCCESS;
+  private static final int INVALID_ARGUMENT = EXIT_COMMAND_ARGUMENT_ERROR;
+
+  /**
+   * Error String when the wrong FS is used for binding: {@value}.
+   **/
+  @VisibleForTesting
+  public static final String WRONG_FILESYSTEM = "Wrong filesystem for ";
+
+  private final String usage = entryPoint + "  s3a://BUCKET\n";
+
+  public AuditTool() {
+  }
+
+  /**
+   * Tells us the usage of the AuditTool by commands.
+   *
+   * @return the string USAGE
+   */
+  public String getUsage() {
+    return usage;
+  }
+
+  /**
+   * This run method in AuditTool takes S3 bucket path.
+   * which contains audit log files from command line arguments.
+   * and merge the audit log files present in that path into single file in.
+   * local system.
+   *
+   * @param args command specific arguments.
+   * @return SUCCESS i.e, '0', which is an exit code.
+   * @throws Exception on any failure.
+   */
+  @Override
+  public int run(String[] args) throws Exception {
+    List<String> argv = new ArrayList<>(Arrays.asList(args));
+    if (argv.isEmpty()) {
+      errorln(getUsage());
+      throw invalidArgs("No bucket specified");
+    }
+    //Path of audit log files in s3 bucket
+    Path s3LogsPath = new Path(argv.get(0));
+
+    //Setting the file system
+    URI fsURI = toUri(String.valueOf(s3LogsPath));
+    S3AFileSystem s3AFileSystem =
+        bindFilesystem(FileSystem.newInstance(fsURI, getConf()));
+    RemoteIterator<LocatedFileStatus> listOfS3LogFiles =
+        s3AFileSystem.listFiles(s3LogsPath, true);
+
+    //Merging local audit files into a single file
+    File s3aLogsDirectory = new File(s3LogsPath.getName());
+    boolean s3aLogsDirectoryCreation = false;
+    if (!s3aLogsDirectory.exists()) {
+      s3aLogsDirectoryCreation = s3aLogsDirectory.mkdir();
+    }
+    if(s3aLogsDirectoryCreation) {
+      while (listOfS3LogFiles.hasNext()) {
+        Path s3LogFilePath = listOfS3LogFiles.next().getPath();
+        File s3LogLocalFilePath =
+            new File(s3aLogsDirectory, s3LogFilePath.getName());
+        boolean localFileCreation = s3LogLocalFilePath.createNewFile();
+        if (localFileCreation) {
+          FileStatus fileStatus = s3AFileSystem.getFileStatus(s3LogFilePath);
+          long s3LogFileLength = fileStatus.getLen();
+          //Reads s3 file data into byte buffer
+          byte[] s3LogDataBuffer =

Review Comment:
   Also if we are first copying all the files from S3 to local fs, we can try to parallelize this as well.



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/TestS3AAuditLogMerger.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.audit;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+import org.junit.After;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * MergerTest will implement different tests on Merger class methods.
+ */
+public class TestS3AAuditLogMerger {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestS3AAuditLogMerger.class);
+
+  private final S3AAuditLogMerger s3AAuditLogMerger = new S3AAuditLogMerger();
+
+  /**
+   * Sample directories and files to test.
+   */
+  private final File auditLogFile = new File("AuditLogFile");
+  private File file1;
+  private File file2;
+  private File file3;
+  private File dir1;
+  private File dir2;
+
+  /**
+   * Testing the mergeFiles method in Merger class.
+   * by passing a sample directory which contains files with some content in it.
+   * and checks if files in a directory are merged into single file.
+   */
+  @Test
+  public void testMergeFiles() throws IOException {
+    dir1 = Files.createTempDirectory("sampleFilesDirectory").toFile();
+    file1 = File.createTempFile("sampleFile1", ".txt", dir1);
+    file2 = File.createTempFile("sampleFile2", ".txt", dir1);
+    file3 = File.createTempFile("sampleFile3", ".txt", dir1);
+    try (FileWriter fw = new FileWriter(file1);
+        FileWriter fw1 = new FileWriter(file2);
+        FileWriter fw2 = new FileWriter(file3)) {
+      fw.write("abcd");
+      fw1.write("efgh");
+      fw2.write("ijkl");
+    }
+    s3AAuditLogMerger.mergeFiles(dir1.getPath());
+    String str =
+        new String(Files.readAllBytes(Paths.get(auditLogFile.getPath())));
+    //File content of each audit log file in merged audit log file are
+    // divided by '\n'.
+    // Performing assertions will be easy by replacing '\n' with ''
+    String fileText = str.replace("\n", "");
+    assertTrue("the string 'abcd' should be in the merged file",
+        fileText.contains("abcd"));
+    assertTrue("the string 'efgh' should be in the merged file",
+        fileText.contains("efgh"));
+    assertTrue("the string 'ijkl' should be in the merged file",
+        fileText.contains("ijkl"));
+  }
+
+  /**
+   * Testing the merged file.
+   * by passing different directories which contains files with some content.
+   * in it and checks if the file is overwritten by new file contents.
+   */
+  @Test
+  public void testMergedFile() throws IOException {
+    //Testing the merged file with contents of first directory
+    dir1 = Files.createTempDirectory("sampleFilesDirectory").toFile();
+    file1 = File.createTempFile("sampleFile1", ".txt", dir1);
+    file2 = File.createTempFile("sampleFile2", ".txt", dir1);
+    try (FileWriter fw = new FileWriter(file1);
+        FileWriter fw1 = new FileWriter(file2)) {
+      fw.write("abcd");
+      fw1.write("efgh");
+    }
+    s3AAuditLogMerger.mergeFiles(dir1.getPath());
+    String str =
+        new String(Files.readAllBytes(Paths.get(auditLogFile.getPath())));
+    //File content of each audit log file in merged audit log file are
+    // divided by '\n'.
+    // Performing assertions will be easy by replacing '\n' with ''
+    String fileText = str.replace("\n", "");
+    assertTrue("the string 'abcd' should be in the merged file",
+        fileText.contains("abcd"));
+    assertTrue("the string 'efgh' should be in the merged file",
+        fileText.contains("efgh"));
+    assertFalse("the string 'ijkl' should not be in the merged file",
+        fileText.contains("ijkl"));
+
+    //Testing the merged file with contents of second directory
+    dir2 = Files.createTempDirectory("sampleFilesDirectory1").toFile();
+    file3 = File.createTempFile("sampleFile3", ".txt", dir2);
+    try (FileWriter fw2 = new FileWriter(file3)) {
+      fw2.write("ijkl");
+    }
+    s3AAuditLogMerger.mergeFiles(dir2.getPath());
+    String str1 =
+        new String(Files.readAllBytes(Paths.get(auditLogFile.getPath())));
+    //File content of each audit log file in merged audit log file are
+    // divided by '\n'.
+    // Performing assertions will be easy by replacing '\n' with ''
+    String fileText1 = str1.replace("\n", "");
+    assertFalse("the string 'abcd' should not be in the merged file",
+        fileText1.contains("abcd"));
+    assertFalse("the string 'efgh' should not be in the merged file",
+        fileText1.contains("efgh"));
+    assertTrue("the string 'ijkl' should be in the merged file",
+        fileText1.contains("ijkl"));
+  }
+
+  /**
+   * Testing the mergeFiles method in Merger class.
+   * by passing an empty directory and checks if merged file is created or not.
+   */
+  @Test
+  public void testMergeFilesEmptyDir() throws IOException {
+    dir1 = Files.createTempDirectory("emptyFilesDirectory").toFile();
+    if (auditLogFile.exists()) {
+      LOG.info("AuditLogFile already exists and we are deleting it here");
+      if (auditLogFile.delete()) {
+        LOG.debug("AuditLogFile deleted");
+      }
+    }
+    s3AAuditLogMerger.mergeFiles(dir1.getPath());
+    assertFalse(
+        "This AuditLogFile shouldn't exist if input directory is empty ",
+        auditLogFile.exists());
+  }
+
+  /**
+   * Delete all the sample directories and sample files after all tests.
+   */
+  @After
+  public void tearDown() throws Exception {
+    if (auditLogFile.exists()) {

Review Comment:
   Use a base directory and delete all the children files in one go rather than one by one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org