You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by je...@apache.org on 2020/07/15 17:06:10 UTC

[hadoop] branch branch-3.2 updated: HADOOP-17099. Replace Guava Predicate with Java8+ Predicate

This is an automated email from the ASF dual-hosted git repository.

jeagles pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 8fd3dcc  HADOOP-17099. Replace Guava Predicate with Java8+ Predicate
8fd3dcc is described below

commit 8fd3dcc9ce75142b80d902693e7df518963ee690
Author: Ahmed Hussein <ah...@apache.org>
AuthorDate: Wed Jul 15 11:39:06 2020 -0500

    HADOOP-17099. Replace Guava Predicate with Java8+ Predicate
    
    Signed-off-by: Jonathan Eagles <je...@gmail.com>
    (cherry picked from commit 1f71c4ae71427a8a7476eaef64187a5643596552)
---
 .../src/main/resources/checkstyle/checkstyle.xml   |  2 +-
 .../hadoop/metrics2/impl/MetricsRecords.java       | 26 +++++---
 .../metrics2/impl/TestMetricsSystemImpl.java       | 30 +++------
 .../blockmanagement/CombinedHostFileManager.java   | 78 ++++++++--------------
 .../server/namenode/NameNodeResourceChecker.java   | 25 +++----
 .../hdfs/server/namenode/snapshot/Snapshot.java    | 19 ++----
 .../yarn/logaggregation/AggregatedLogFormat.java   | 12 +---
 .../LogAggregationFileController.java              | 22 +++---
 .../ifile/LogAggregationIndexedFileController.java | 21 +++---
 .../logaggregation/AppLogAggregatorImpl.java       | 16 ++---
 10 files changed, 91 insertions(+), 160 deletions(-)

diff --git a/hadoop-build-tools/src/main/resources/checkstyle/checkstyle.xml b/hadoop-build-tools/src/main/resources/checkstyle/checkstyle.xml
index e0a55f7..3e6ae7a 100644
--- a/hadoop-build-tools/src/main/resources/checkstyle/checkstyle.xml
+++ b/hadoop-build-tools/src/main/resources/checkstyle/checkstyle.xml
@@ -123,7 +123,7 @@
           <property name="regexp" value="true"/>
           <property name="illegalPkgs" value="^sun\.[^.]+"/>
           <property name="illegalClasses"
-            value="^com\.google\.common\.base\.(Optional|Function), ^com\.google\.common\.collect\.(ImmutableListMultimap)"/>
+            value="^com\.google\.common\.base\.(Optional|Function|Predicate), ^com\.google\.common\.collect\.(ImmutableListMultimap)"/>
         </module>
         <module name="RedundantImport"/>
         <module name="UnusedImports"/>
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/MetricsRecords.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/MetricsRecords.java
index 5d52cad..7865714 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/MetricsRecords.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/MetricsRecords.java
@@ -18,8 +18,8 @@
 
 package org.apache.hadoop.metrics2.impl;
 
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
+import java.util.function.Predicate;
+import java.util.stream.StreamSupport;
 import org.apache.hadoop.metrics2.AbstractMetric;
 import org.apache.hadoop.metrics2.MetricsRecord;
 import org.apache.hadoop.metrics2.MetricsTag;
@@ -65,16 +65,22 @@ public class MetricsRecords {
         resourceLimitMetric);
   }
 
-  private static MetricsTag getFirstTagByName(MetricsRecord record, String name) {
-    return Iterables.getFirst(Iterables.filter(record.tags(),
-        new MetricsTagPredicate(name)), null);
+  private static MetricsTag getFirstTagByName(MetricsRecord record,
+      String name) {
+    if (record.tags() == null) {
+      return null;
+    }
+    return record.tags().stream().filter(
+        new MetricsTagPredicate(name)).findFirst().orElse(null);
   }
 
   private static AbstractMetric getFirstMetricByName(
       MetricsRecord record, String name) {
-    return Iterables.getFirst(
-        Iterables.filter(record.metrics(), new AbstractMetricPredicate(name)),
-        null);
+    if (record.metrics() == null) {
+      return null;
+    }
+    return StreamSupport.stream(record.metrics().spliterator(), false)
+        .filter(new AbstractMetricPredicate(name)).findFirst().orElse(null);
   }
 
   private static class MetricsTagPredicate implements Predicate<MetricsTag> {
@@ -86,7 +92,7 @@ public class MetricsRecords {
     }
 
     @Override
-    public boolean apply(MetricsTag input) {
+    public boolean test(MetricsTag input) {
       return input.name().equals(tagName);
     }
   }
@@ -101,7 +107,7 @@ public class MetricsRecords {
     }
 
     @Override
-    public boolean apply(AbstractMetric input) {
+    public boolean test(AbstractMetric input) {
       return input.name().equals(metricName);
     }
   }
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java
index 47520b5..c648a43 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java
@@ -23,9 +23,7 @@ import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
-
-import javax.annotation.Nullable;
-
+import java.util.stream.StreamSupport;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 
@@ -38,7 +36,6 @@ import org.mockito.stubbing.Answer;
 import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
 
-import com.google.common.base.Predicate;
 import com.google.common.base.Supplier;
 import com.google.common.collect.Iterables;
 
@@ -59,7 +56,6 @@ import org.apache.hadoop.metrics2.lib.MetricsRegistry;
 import org.apache.hadoop.metrics2.lib.MutableCounterLong;
 import org.apache.hadoop.metrics2.lib.MutableRate;
 import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -246,13 +242,9 @@ public class TestMetricsSystemImpl {
     for (Thread t : threads)
       t.join();
     assertEquals(0L, ms.droppedPubAll.value());
-    assertTrue(StringUtils.join("\n", Arrays.asList(results)),
-      Iterables.all(Arrays.asList(results), new Predicate<String>() {
-        @Override
-        public boolean apply(@Nullable String input) {
-          return input.equalsIgnoreCase("Passed");
-        }
-      }));
+    assertTrue(String.join("\n", Arrays.asList(results)),
+        Arrays.asList(results).stream().allMatch(
+            input -> input.equalsIgnoreCase("Passed")));
     ms.stop();
     ms.shutdown();
   }
@@ -482,14 +474,12 @@ public class TestMetricsSystemImpl {
       ms.onTimerEvent();
       verify(dataSink, timeout(500).times(2)).putMetrics(r1.capture());
       List<MetricsRecord> mr = r1.getAllValues();
-      Number qSize = Iterables.find(mr.get(1).metrics(),
-          new Predicate<AbstractMetric>() {
-            @Override
-            public boolean apply(@Nullable AbstractMetric input) {
-              assert input != null;
-              return input.name().equals("Sink_slowSinkQsize");
-            }
-      }).value();
+      Number qSize = StreamSupport.stream(mr.get(1).metrics().spliterator(),
+          false).filter(
+              input -> {
+                assert input != null;
+                return input.name().equals("Sink_slowSinkQsize");
+              }).findFirst().get().value();
       assertEquals(1, qSize);
     } finally {
       proceedSignal.countDown();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CombinedHostFileManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CombinedHostFileManager.java
index d6a0972..44f3fbe 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CombinedHostFileManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CombinedHostFileManager.java
@@ -21,9 +21,8 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.UnmodifiableIterator;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Collections2;
 
+import java.util.stream.Collectors;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -40,7 +39,7 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.Map;
 
-import com.google.common.base.Predicate;
+
 
 import org.apache.hadoop.hdfs.util.CombinedHostsFileReader;
 
@@ -84,37 +83,26 @@ public class CombinedHostFileManager extends HostConfigManager {
     // If the includes list is empty, act as if everything is in the
     // includes list.
     synchronized boolean isIncluded(final InetSocketAddress address) {
-      return emptyInServiceNodeLists || Iterables.any(
-          allDNs.get(address.getAddress()),
-          new Predicate<DatanodeAdminProperties>() {
-            public boolean apply(DatanodeAdminProperties input) {
-              return input.getPort() == 0 ||
-                  input.getPort() == address.getPort();
-            }
-          });
+      return emptyInServiceNodeLists || allDNs.get(address.getAddress())
+          .stream().anyMatch(
+              input -> input.getPort() == 0 ||
+                  input.getPort() == address.getPort());
     }
 
     synchronized boolean isExcluded(final InetSocketAddress address) {
-      return Iterables.any(allDNs.get(address.getAddress()),
-          new Predicate<DatanodeAdminProperties>() {
-            public boolean apply(DatanodeAdminProperties input) {
-              return input.getAdminState().equals(
-                  AdminStates.DECOMMISSIONED) &&
-                  (input.getPort() == 0 ||
-                      input.getPort() == address.getPort());
-            }
-          });
+      return allDNs.get(address.getAddress()).stream().anyMatch(
+          input -> input.getAdminState().equals(
+              AdminStates.DECOMMISSIONED) &&
+              (input.getPort() == 0 ||
+                  input.getPort() == address.getPort()));
     }
 
     synchronized String getUpgradeDomain(final InetSocketAddress address) {
-      Iterable<DatanodeAdminProperties> datanode = Iterables.filter(
-          allDNs.get(address.getAddress()),
-          new Predicate<DatanodeAdminProperties>() {
-            public boolean apply(DatanodeAdminProperties input) {
-              return (input.getPort() == 0 ||
-                  input.getPort() == address.getPort());
-            }
-          });
+      Iterable<DatanodeAdminProperties> datanode =
+          allDNs.get(address.getAddress()).stream().filter(
+              input -> (input.getPort() == 0 ||
+                  input.getPort() == address.getPort())).collect(
+              Collectors.toList());
       return datanode.iterator().hasNext() ?
           datanode.iterator().next().getUpgradeDomain() : null;
     }
@@ -129,36 +117,22 @@ public class CombinedHostFileManager extends HostConfigManager {
     }
 
     Iterable<InetSocketAddress> getExcludes() {
-      return new Iterable<InetSocketAddress>() {
-        @Override
-        public Iterator<InetSocketAddress> iterator() {
-          return new HostIterator(
-              Collections2.filter(allDNs.entries(),
-                  new Predicate<java.util.Map.Entry<InetAddress,
-                      DatanodeAdminProperties>>() {
-                    public boolean apply(java.util.Map.Entry<InetAddress,
-                        DatanodeAdminProperties> entry) {
-                      return entry.getValue().getAdminState().equals(
-                          AdminStates.DECOMMISSIONED);
-                    }
-                  }
-              ));
-        }
-      };
+      return () -> new HostIterator(
+          allDNs.entries().stream().filter(
+              entry -> entry.getValue().getAdminState().equals(
+                  AdminStates.DECOMMISSIONED)).collect(
+              Collectors.toList()));
     }
 
     synchronized long getMaintenanceExpireTimeInMS(
         final InetSocketAddress address) {
-      Iterable<DatanodeAdminProperties> datanode = Iterables.filter(
-          allDNs.get(address.getAddress()),
-          new Predicate<DatanodeAdminProperties>() {
-            public boolean apply(DatanodeAdminProperties input) {
-              return input.getAdminState().equals(
+      Iterable<DatanodeAdminProperties> datanode =
+          allDNs.get(address.getAddress()).stream().filter(
+              input -> input.getAdminState().equals(
                   AdminStates.IN_MAINTENANCE) &&
                   (input.getPort() == 0 ||
-                  input.getPort() == address.getPort());
-            }
-          });
+                      input.getPort() == address.getPort())).collect(
+              Collectors.toList());
       // if DN isn't set to maintenance state, ignore MaintenanceExpireTimeInMS
       // set in the config.
       return datanode.iterator().hasNext() ?
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeResourceChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeResourceChecker.java
index 898f57e..07bc926 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeResourceChecker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeResourceChecker.java
@@ -24,7 +24,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
-
+import java.util.stream.Collectors;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -34,8 +34,6 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.server.common.Util;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Collections2;
-import com.google.common.base.Predicate;
 
 /**
  * 
@@ -116,18 +114,15 @@ public class NameNodeResourceChecker {
     
     Collection<URI> extraCheckedVolumes = Util.stringCollectionAsURIs(conf
         .getTrimmedStringCollection(DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_KEY));
-    
-    Collection<URI> localEditDirs = Collections2.filter(
-        FSNamesystem.getNamespaceEditsDirs(conf),
-        new Predicate<URI>() {
-          @Override
-          public boolean apply(URI input) {
-            if (input.getScheme().equals(NNStorage.LOCAL_URI_SCHEME)) {
-              return true;
-            }
-            return false;
-          }
-        });
+
+    Collection<URI> localEditDirs =
+        FSNamesystem.getNamespaceEditsDirs(conf).stream().filter(
+            input -> {
+              if (input.getScheme().equals(NNStorage.LOCAL_URI_SCHEME)) {
+                return true;
+              }
+              return false;
+            }).collect(Collectors.toList());
 
     // Add all the local edits dirs, marking some as required if they are
     // configured as such.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java
index 515f164..f13ef61 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java
@@ -24,7 +24,7 @@ import java.text.SimpleDateFormat;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.Date;
-
+import java.util.stream.Collectors;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSUtil;
@@ -38,9 +38,6 @@ import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
 import org.apache.hadoop.hdfs.server.namenode.XAttrFeature;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
 
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
 import org.apache.hadoop.security.AccessControlException;
 
 /** Snapshot of a sub-tree in the namesystem. */
@@ -149,20 +146,14 @@ public class Snapshot implements Comparable<byte[]> {
   static public class Root extends INodeDirectory {
     Root(INodeDirectory other) {
       // Always preserve ACL, XAttr.
-      super(other, false, Lists.newArrayList(
-        Iterables.filter(Arrays.asList(other.getFeatures()), new Predicate<Feature>() {
-
-          @Override
-          public boolean apply(Feature input) {
-            if (AclFeature.class.isInstance(input) 
+      super(other, false, Arrays.asList(other.getFeatures()).stream().filter(
+          input -> {
+            if (AclFeature.class.isInstance(input)
                 || XAttrFeature.class.isInstance(input)) {
               return true;
             }
             return false;
-          }
-          
-        }))
-        .toArray(new Feature[0]));
+          }).collect(Collectors.toList()).toArray(new Feature[0]));
     }
 
     @Override
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
index 4d0beaa..a8f1949 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
@@ -74,7 +74,6 @@ import org.apache.hadoop.yarn.util.Times;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 
@@ -355,14 +354,9 @@ public class AggregatedLogFormat {
               : this.logAggregationContext.getRolledLogsExcludePattern(),
           candidates, true);
 
-      Iterable<File> mask =
-          Iterables.filter(candidates, new Predicate<File>() {
-            @Override
-            public boolean apply(File next) {
-              return !alreadyUploadedLogFiles
-                  .contains(getLogFileMetaData(next));
-            }
-          });
+      Iterable<File> mask = Iterables.filter(candidates, (input) ->
+          !alreadyUploadedLogFiles
+              .contains(getLogFileMetaData(input)));
       return Sets.newHashSet(mask);
     }
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
index 7db5a26..35f2d45 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
@@ -19,9 +19,7 @@
 package org.apache.hadoop.yarn.logaggregation.filecontroller;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Sets;
+
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.OutputStream;
@@ -34,6 +32,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
@@ -521,17 +520,12 @@ public abstract class LogAggregationFileController {
       Set<FileStatus> status =
           new HashSet<FileStatus>(Arrays.asList(remoteFS.listStatus(appDir)));
 
-      Iterable<FileStatus> mask =
-          Iterables.filter(status, new Predicate<FileStatus>() {
-            @Override
-            public boolean apply(FileStatus next) {
-              return next.getPath().getName()
-                .contains(LogAggregationUtils.getNodeString(nodeId))
-                && !next.getPath().getName().endsWith(
-                    LogAggregationUtils.TMP_FILE_SUFFIX);
-            }
-          });
-      status = Sets.newHashSet(mask);
+      status = status.stream().filter(
+          next -> next.getPath().getName()
+              .contains(LogAggregationUtils.getNodeString(nodeId))
+              && !next.getPath().getName().endsWith(
+              LogAggregationUtils.TMP_FILE_SUFFIX)).collect(
+          Collectors.toSet());
       // Normally, we just need to delete one oldest log
       // before we upload a new log.
       // If we can not delete the older logs in this cycle,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java
index 2173616..dbbfb44 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java
@@ -19,9 +19,7 @@
 package org.apache.hadoop.yarn.logaggregation.filecontroller.ifile;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Sets;
+
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
@@ -43,6 +41,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.stream.Collectors;
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
@@ -677,16 +676,12 @@ public class LogAggregationIndexedFileController
   public Map<String, Long> parseCheckSumFiles(
       List<FileStatus> fileList) throws IOException {
     Map<String, Long> checkSumFiles = new HashMap<>();
-    Set<FileStatus> status = new HashSet<FileStatus>(fileList);
-    Iterable<FileStatus> mask =
-        Iterables.filter(status, new Predicate<FileStatus>() {
-          @Override
-          public boolean apply(FileStatus next) {
-            return next.getPath().getName().endsWith(
-                CHECK_SUM_FILE_SUFFIX);
-          }
-        });
-    status = Sets.newHashSet(mask);
+    Set<FileStatus> status =
+        new HashSet<>(fileList).stream().filter(
+            next -> next.getPath().getName().endsWith(
+                CHECK_SUM_FILE_SUFFIX)).collect(
+            Collectors.toSet());
+
     FileContext fc = null;
     for (FileStatus file : status) {
       FSDataInputStream checksumFileInputStream = null;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
index 94a9700..840a90c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
@@ -28,6 +28,7 @@ import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,8 +73,6 @@ import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.yarn.util.Times;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 
 
@@ -665,16 +664,9 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
         .getCurrentUpLoadedFileMeta());
       // if any of the previous uploaded logs have been deleted,
       // we need to remove them from alreadyUploadedLogs
-      Iterable<String> mask =
-          Iterables.filter(uploadedFileMeta, new Predicate<String>() {
-            @Override
-            public boolean apply(String next) {
-              return logValue.getAllExistingFilesMeta().contains(next);
-            }
-          });
-
-      this.uploadedFileMeta = Sets.newHashSet(mask);
-
+      this.uploadedFileMeta = uploadedFileMeta.stream().filter(
+          next -> logValue.getAllExistingFilesMeta().contains(next)).collect(
+          Collectors.toSet());
       // need to return files uploaded or older-than-retention clean up.
       return Sets.union(logValue.getCurrentUpLoadedFilesPath(),
           logValue.getObsoleteRetentionLogFiles());


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org